diff --git a/contrib/mesos/pkg/queue/historical.go b/contrib/mesos/pkg/queue/historical.go index 492cb2a9b69..6fe65f21e2b 100644 --- a/contrib/mesos/pkg/queue/historical.go +++ b/contrib/mesos/pkg/queue/historical.go @@ -277,7 +277,7 @@ func (f *HistoricalFIFO) pop(cancel chan struct{}) interface{} { } } -func (f *HistoricalFIFO) Replace(objs []interface{}) error { +func (f *HistoricalFIFO) Replace(objs []interface{}, resourceVersion string) error { notifications := make([]Entry, 0, len(objs)) defer func() { for _, e := range notifications { diff --git a/contrib/mesos/pkg/queue/historical_test.go b/contrib/mesos/pkg/queue/historical_test.go index 4477601beda..587629be6df 100644 --- a/contrib/mesos/pkg/queue/historical_test.go +++ b/contrib/mesos/pkg/queue/historical_test.go @@ -122,7 +122,7 @@ func TestFIFO_addUpdate(t *testing.T) { func TestFIFO_addReplace(t *testing.T) { f := NewHistorical(nil) f.Add(&testObj{"foo", 10}) - f.Replace([]interface{}{&testObj{"foo", 15}}) + f.Replace([]interface{}{&testObj{"foo", 15}}, "0") got := make(chan *testObj, 2) go func() { for { diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index f6c400faeb7..56cc419abf6 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -893,11 +893,11 @@ func (psa *podStoreAdapter) Get(obj interface{}) (interface{}, bool, error) { // Replace will delete the contents of the store, using instead the // given map. This store implementation does NOT take ownership of the map. -func (psa *podStoreAdapter) Replace(objs []interface{}) error { +func (psa *podStoreAdapter) Replace(objs []interface{}, resourceVersion string) error { newobjs := make([]interface{}, len(objs)) for i, v := range objs { pod := v.(*api.Pod) newobjs[i] = &Pod{Pod: pod} } - return psa.FIFO.Replace(newobjs) + return psa.FIFO.Replace(newobjs, resourceVersion) } diff --git a/pkg/client/unversioned/cache/delta_fifo.go b/pkg/client/unversioned/cache/delta_fifo.go index 96315b8d590..4b432208ba4 100644 --- a/pkg/client/unversioned/cache/delta_fifo.go +++ b/pkg/client/unversioned/cache/delta_fifo.go @@ -307,7 +307,7 @@ func (f *DeltaFIFO) Pop() interface{} { // 'f' takes ownership of the map, you should not reference the map again // after calling this function. f's queue is reset, too; upon return, it // will contain the items in the map, in no particular order. -func (f *DeltaFIFO) Replace(list []interface{}) error { +func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { f.lock.Lock() defer f.lock.Unlock() for _, item := range list { diff --git a/pkg/client/unversioned/cache/delta_fifo_test.go b/pkg/client/unversioned/cache/delta_fifo_test.go index 24b2e7739fc..1b2288ab006 100644 --- a/pkg/client/unversioned/cache/delta_fifo_test.go +++ b/pkg/client/unversioned/cache/delta_fifo_test.go @@ -97,7 +97,7 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) { ) f.Add(mkFifoObj("foo", 10)) f.Update(mkFifoObj("foo", 12)) - f.Replace([]interface{}{mkFifoObj("foo", 20)}) + f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0") f.Delete(mkFifoObj("foo", 15)) f.Delete(mkFifoObj("foo", 18)) // flush the last one out expect := []DeltaType{Added, Updated, Sync, Deleted} @@ -165,7 +165,7 @@ func TestDeltaFIFO_enqueueing(t *testing.T) { func TestDeltaFIFO_addReplace(t *testing.T) { f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) f.Add(mkFifoObj("foo", 10)) - f.Replace([]interface{}{mkFifoObj("foo", 15)}) + f.Replace([]interface{}{mkFifoObj("foo", 15)}, "0") got := make(chan testFifoObject, 2) go func() { for { @@ -197,7 +197,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { }), ) f.Delete(mkFifoObj("baz", 10)) - f.Replace([]interface{}{mkFifoObj("foo", 5)}) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") expectedList := []Deltas{ {{Deleted, mkFifoObj("baz", 10)}}, diff --git a/pkg/client/unversioned/cache/expiration_cache.go b/pkg/client/unversioned/cache/expiration_cache.go index 077a38755dc..e6d67c14274 100644 --- a/pkg/client/unversioned/cache/expiration_cache.go +++ b/pkg/client/unversioned/cache/expiration_cache.go @@ -166,7 +166,7 @@ func (c *ExpirationCache) Delete(obj interface{}) error { // Replace will convert all items in the given list to TimestampedEntries // before attempting the replace operation. The replace operation will // delete the contents of the ExpirationCache `c`. -func (c *ExpirationCache) Replace(list []interface{}) error { +func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error { items := map[string]interface{}{} ts := c.clock.Now() for _, item := range list { @@ -176,7 +176,7 @@ func (c *ExpirationCache) Replace(list []interface{}) error { } items[key] = ×tampedEntry{item, ts} } - c.cacheStorage.Replace(items) + c.cacheStorage.Replace(items, resourceVersion) return nil } diff --git a/pkg/client/unversioned/cache/fifo.go b/pkg/client/unversioned/cache/fifo.go index 45e5f669411..b56687ee0fd 100644 --- a/pkg/client/unversioned/cache/fifo.go +++ b/pkg/client/unversioned/cache/fifo.go @@ -188,7 +188,7 @@ func (f *FIFO) Pop() interface{} { // 'f' takes ownership of the map, you should not reference the map again // after calling this function. f's queue is reset, too; upon return, it // will contain the items in the map, in no particular order. -func (f *FIFO) Replace(list []interface{}) error { +func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { items := map[string]interface{}{} for _, item := range list { key, err := f.keyFunc(item) diff --git a/pkg/client/unversioned/cache/fifo_test.go b/pkg/client/unversioned/cache/fifo_test.go index cc6f27b7940..bf99b5aa152 100644 --- a/pkg/client/unversioned/cache/fifo_test.go +++ b/pkg/client/unversioned/cache/fifo_test.go @@ -107,7 +107,7 @@ func TestFIFO_addUpdate(t *testing.T) { func TestFIFO_addReplace(t *testing.T) { f := NewFIFO(testFifoObjectKeyFunc) f.Add(mkFifoObj("foo", 10)) - f.Replace([]interface{}{mkFifoObj("foo", 15)}) + f.Replace([]interface{}{mkFifoObj("foo", 15)}, "15") got := make(chan testFifoObject, 2) go func() { for { diff --git a/pkg/client/unversioned/cache/poller.go b/pkg/client/unversioned/cache/poller.go deleted file mode 100644 index ef73ae29f31..00000000000 --- a/pkg/client/unversioned/cache/poller.go +++ /dev/null @@ -1,86 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cache - -import ( - "time" - - "github.com/golang/glog" - "k8s.io/kubernetes/pkg/util" -) - -// Enumerator should be able to return the list of objects to be synced with -// one object at a time. -type Enumerator interface { - Len() int - Get(index int) (object interface{}) -} - -// GetFunc should return an enumerator that you wish the Poller to process. -type GetFunc func() (Enumerator, error) - -// Poller is like Reflector, but it periodically polls instead of watching. -// This is intended to be a workaround for api objects that don't yet support -// watching. -type Poller struct { - getFunc GetFunc - period time.Duration - store Store -} - -// NewPoller constructs a new poller. Note that polling probably doesn't make much -// sense to use along with the FIFO queue. The returned Poller will call getFunc and -// sync the objects in 'store' with the returned Enumerator, waiting 'period' between -// each call. It probably only makes sense to use a poller if you're treating the -// store as read-only. -func NewPoller(getFunc GetFunc, period time.Duration, store Store) *Poller { - return &Poller{ - getFunc: getFunc, - period: period, - store: store, - } -} - -// Run begins polling. It starts a goroutine and returns immediately. -func (p *Poller) Run() { - go util.Until(p.run, p.period, util.NeverStop) -} - -// RunUntil begins polling. It starts a goroutine and returns immediately. -// It will stop when the stopCh is closed. -func (p *Poller) RunUntil(stopCh <-chan struct{}) { - go util.Until(p.run, p.period, stopCh) -} - -func (p *Poller) run() { - e, err := p.getFunc() - if err != nil { - glog.Errorf("failed to list: %v", err) - return - } - p.sync(e) -} - -func (p *Poller) sync(e Enumerator) { - items := []interface{}{} - for i := 0; i < e.Len(); i++ { - object := e.Get(i) - items = append(items, object) - } - - p.store.Replace(items) -} diff --git a/pkg/client/unversioned/cache/poller_test.go b/pkg/client/unversioned/cache/poller_test.go deleted file mode 100644 index 2d3951d8cbd..00000000000 --- a/pkg/client/unversioned/cache/poller_test.go +++ /dev/null @@ -1,132 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cache - -import ( - "errors" - "reflect" - "testing" - "time" -) - -func testPairKeyFunc(obj interface{}) (string, error) { - return obj.(testPair).id, nil -} - -type testPair struct { - id string - obj interface{} -} -type testEnumerator []testPair - -func (t testEnumerator) Len() int { return len(t) } -func (t testEnumerator) Get(i int) interface{} { - return t[i] -} - -func TestPoller_sync(t *testing.T) { - table := []struct { - // each step simulates the list that a getFunc would receive. - steps [][]testPair - }{ - { - steps: [][]testPair{ - { - {"foo", "foo1"}, - {"bar", "bar1"}, - {"baz", "baz1"}, - {"qux", "qux1"}, - }, { - {"foo", "foo2"}, - {"bar", "bar2"}, - {"qux", "qux2"}, - }, { - {"bar", "bar3"}, - {"baz", "baz2"}, - {"qux", "qux3"}, - }, { - {"qux", "qux4"}, - }, { - {"foo", "foo3"}, - }, - }, - }, - } - - for testCase, item := range table { - s := NewStore(testPairKeyFunc) - // This is a unit test for the sync function, hence the nil getFunc. - p := NewPoller(nil, 0, s) - for line, pairs := range item.steps { - p.sync(testEnumerator(pairs)) - - list := s.List() - for _, pair := range pairs { - foundInList := false - for _, listItem := range list { - id, _ := testPairKeyFunc(listItem) - if pair.id == id { - foundInList = true - } - } - if !foundInList { - t.Errorf("%v, %v: expected to find list entry for %v, but did not.", testCase, line, pair.id) - continue - } - found, ok, _ := s.Get(pair) - if !ok { - t.Errorf("%v, %v: unexpected absent entry for %v", testCase, line, pair.id) - continue - } - if e, a := pair.obj, found.(testPair).obj; !reflect.DeepEqual(e, a) { - t.Errorf("%v, %v: expected %v, got %v for %v", testCase, line, e, a, pair.id) - } - } - if e, a := len(pairs), len(list); e != a { - t.Errorf("%v, %v: expected len %v, got %v", testCase, line, e, a) - } - } - } -} - -func TestPoller_Run(t *testing.T) { - stopCh := make(chan struct{}) - defer func() { stopCh <- struct{}{} }() - s := NewStore(testPairKeyFunc) - const count = 10 - var called = 0 - done := make(chan struct{}) - NewPoller(func() (Enumerator, error) { - called++ - if called == count { - close(done) - } - // test both error and regular returns. - if called&1 == 0 { - return testEnumerator{}, nil - } - return nil, errors.New("transient error") - }, time.Millisecond, s).RunUntil(stopCh) - - // The test here is that we get called at least count times. - <-done - - // We never added anything, verify that. - if e, a := 0, len(s.List()); e != a { - t.Errorf("expected %v, got %v", e, a) - } -} diff --git a/pkg/client/unversioned/cache/reflector.go b/pkg/client/unversioned/cache/reflector.go index 85a5ff58ebb..4c2d664205a 100644 --- a/pkg/client/unversioned/cache/reflector.go +++ b/pkg/client/unversioned/cache/reflector.go @@ -234,12 +234,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err for _, item := range items { found = append(found, item) } - - myStore, ok := r.store.(*WatchCache) - if ok { - return myStore.ReplaceWithVersion(found, resourceVersion) - } - return r.store.Replace(found) + return r.store.Replace(found, resourceVersion) } // watchHandler watches w and keeps *resourceVersion up to date. diff --git a/pkg/client/unversioned/cache/reflector_test.go b/pkg/client/unversioned/cache/reflector_test.go index e592638cd41..c57b723ae7b 100644 --- a/pkg/client/unversioned/cache/reflector_test.go +++ b/pkg/client/unversioned/cache/reflector_test.go @@ -359,34 +359,3 @@ func TestReflector_ListAndWatchWithErrors(t *testing.T) { r.ListAndWatch(util.NeverStop) } } - -func TestReflectorForWatchCache(t *testing.T) { - store := NewWatchCache(5) - - { - _, version := store.ListWithVersion() - if version != 0 { - t.Errorf("unexpected resource version: %d", version) - } - } - - lw := &testLW{ - WatchFunc: func(rv string) (watch.Interface, error) { - fw := watch.NewFake() - go fw.Stop() - return fw, nil - }, - ListFunc: func() (runtime.Object, error) { - return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "10"}}, nil - }, - } - r := NewReflector(lw, &api.Pod{}, store, 0) - r.ListAndWatch(util.NeverStop) - - { - _, version := store.ListWithVersion() - if version != 10 { - t.Errorf("unexpected resource version: %d", version) - } - } -} diff --git a/pkg/client/unversioned/cache/store.go b/pkg/client/unversioned/cache/store.go index 3972db9b9c8..ce16f9c8b74 100644 --- a/pkg/client/unversioned/cache/store.go +++ b/pkg/client/unversioned/cache/store.go @@ -43,7 +43,7 @@ type Store interface { // Replace will delete the contents of the store, using instead the // given list. Store takes ownership of the list, you should not reference // it after calling this function. - Replace([]interface{}) error + Replace([]interface{}, string) error } // KeyFunc knows how to make a key from an object. Implementations should be deterministic. @@ -193,7 +193,7 @@ func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) // Replace will delete the contents of 'c', using instead the given list. // 'c' takes ownership of the list, you should not reference the list again // after calling this function. -func (c *cache) Replace(list []interface{}) error { +func (c *cache) Replace(list []interface{}, resourceVersion string) error { items := map[string]interface{}{} for _, item := range list { key, err := c.keyFunc(item) @@ -202,7 +202,7 @@ func (c *cache) Replace(list []interface{}) error { } items[key] = item } - c.cacheStorage.Replace(items) + c.cacheStorage.Replace(items, resourceVersion) return nil } diff --git a/pkg/client/unversioned/cache/store_test.go b/pkg/client/unversioned/cache/store_test.go index f3d2e2fb73c..2d3b153af7a 100644 --- a/pkg/client/unversioned/cache/store_test.go +++ b/pkg/client/unversioned/cache/store_test.go @@ -70,7 +70,7 @@ func doTestStore(t *testing.T, store Store) { store.Replace([]interface{}{ mkObj("foo", "foo"), mkObj("bar", "bar"), - }) + }, "0") { found := util.StringSet{} diff --git a/pkg/client/unversioned/cache/thread_safe_store.go b/pkg/client/unversioned/cache/thread_safe_store.go index 5480f39a9ca..20113937890 100644 --- a/pkg/client/unversioned/cache/thread_safe_store.go +++ b/pkg/client/unversioned/cache/thread_safe_store.go @@ -41,7 +41,7 @@ type ThreadSafeStore interface { Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string - Replace(map[string]interface{}) + Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) @@ -112,7 +112,7 @@ func (c *threadSafeMap) ListKeys() []string { return list } -func (c *threadSafeMap) Replace(items map[string]interface{}) { +func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) { c.lock.Lock() defer c.lock.Unlock() c.items = items diff --git a/pkg/client/unversioned/cache/undelta_store.go b/pkg/client/unversioned/cache/undelta_store.go index 5cd6378eaf8..4a8a4500e34 100644 --- a/pkg/client/unversioned/cache/undelta_store.go +++ b/pkg/client/unversioned/cache/undelta_store.go @@ -66,8 +66,8 @@ func (u *UndeltaStore) Delete(obj interface{}) error { return nil } -func (u *UndeltaStore) Replace(list []interface{}) error { - if err := u.Store.Replace(list); err != nil { +func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error { + if err := u.Store.Replace(list, resourceVersion); err != nil { return err } u.PushFunc(u.Store.List()) diff --git a/pkg/client/unversioned/cache/undelta_store_test.go b/pkg/client/unversioned/cache/undelta_store_test.go index d6aa3738a67..c14b7a80087 100644 --- a/pkg/client/unversioned/cache/undelta_store_test.go +++ b/pkg/client/unversioned/cache/undelta_store_test.go @@ -120,7 +120,7 @@ func TestReplaceCallsPush(t *testing.T) { m := []interface{}{mkObj("a", 1)} - u.Replace(m) + u.Replace(m, "0") if callcount != 1 { t.Errorf("Expected 1 calls, got %d", callcount) } diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 6249071ad03..014c82f5004 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -910,7 +910,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { // This should have no effect, since we've deleted the rc. podExp.Seen(1, 0) - manager.podStore.Store.Replace(make([]interface{}, 0)) + manager.podStore.Store.Replace(make([]interface{}, 0), "0") manager.syncReplicationController(getKey(rc, t)) validateSyncReplication(t, &fakePodControl, 0, 0) } diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index c3a8c7ae975..ee76b029a1f 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -86,7 +86,7 @@ type Cacher struct { storage Interface // "sliding window" of recent changes of objects and the current state. - watchCache *cache.WatchCache + watchCache *watchCache reflector *cache.Reflector // Registered watchers. @@ -104,7 +104,7 @@ type Cacher struct { // internal cache and updating its cache in the background based on the given // configuration. func NewCacher(config CacherConfig) *Cacher { - watchCache := cache.NewWatchCache(config.CacheCapacity) + watchCache := newWatchCache(config.CacheCapacity) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) cacher := &Cacher{ @@ -272,7 +272,7 @@ func (c *Cacher) Codec() runtime.Codec { return c.storage.Codec() } -func (c *Cacher) processEvent(event cache.WatchCacheEvent) { +func (c *Cacher) processEvent(event watchCacheEvent) { c.Lock() defer c.Unlock() for _, watcher := range c.watchers { @@ -361,16 +361,16 @@ func (lw *cacherListerWatcher) Watch(resourceVersion string) (watch.Interface, e // cacherWatch implements watch.Interface type cacheWatcher struct { sync.Mutex - input chan cache.WatchCacheEvent + input chan watchCacheEvent result chan watch.Event filter FilterFunc stopped bool forget func() } -func newCacheWatcher(initEvents []cache.WatchCacheEvent, filter FilterFunc, forget func()) *cacheWatcher { +func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func()) *cacheWatcher { watcher := &cacheWatcher{ - input: make(chan cache.WatchCacheEvent, 10), + input: make(chan watchCacheEvent, 10), result: make(chan watch.Event, 10), filter: filter, stopped: false, @@ -400,11 +400,11 @@ func (c *cacheWatcher) stop() { } } -func (c *cacheWatcher) add(event cache.WatchCacheEvent) { +func (c *cacheWatcher) add(event watchCacheEvent) { c.input <- event } -func (c *cacheWatcher) sendWatchCacheEvent(event cache.WatchCacheEvent) { +func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) { curObjPasses := event.Type != watch.Deleted && c.filter(event.Object) oldObjPasses := false if event.PrevObject != nil { @@ -430,7 +430,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event cache.WatchCacheEvent) { } } -func (c *cacheWatcher) process(initEvents []cache.WatchCacheEvent) { +func (c *cacheWatcher) process(initEvents []watchCacheEvent) { for _, event := range initEvents { c.sendWatchCacheEvent(event) } diff --git a/pkg/client/unversioned/cache/watch_cache.go b/pkg/storage/watch_cache.go similarity index 74% rename from pkg/client/unversioned/cache/watch_cache.go rename to pkg/storage/watch_cache.go index a40c83a7cd6..d41a2b7960b 100644 --- a/pkg/client/unversioned/cache/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package cache +package storage import ( "fmt" @@ -23,20 +23,16 @@ import ( "sync" "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/client/unversioned/cache" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/watch" ) -// TODO(wojtek-t): All structure in this file should be private to -// pkg/storage package. We should remove the reference to WatchCache -// from Reflector (by changing the Replace method signature in Store -// interface to take resource version too) and move it under pkg/storage. - -// WatchCacheEvent is a single "watch event" that is send to users of -// WatchCache. Additionally to a typical "watch.Event" it contains +// watchCacheEvent is a single "watch event" that is send to users of +// watchCache. Additionally to a typical "watch.Event" it contains // the previous value of the object to enable proper filtering in the // upper layers. -type WatchCacheEvent struct { +type watchCacheEvent struct { Type watch.EventType Object runtime.Object PrevObject runtime.Object @@ -47,15 +43,15 @@ type WatchCacheEvent struct { // itself. type watchCacheElement struct { resourceVersion uint64 - watchCacheEvent WatchCacheEvent + watchCacheEvent watchCacheEvent } -// WatchCache implements a Store interface. +// watchCache implements a Store interface. // However, it depends on the elements implementing runtime.Object interface. // -// WatchCache is a "sliding window" (with a limitted capacity) of objects +// watchCache is a "sliding window" (with a limitted capacity) of objects // observed from a watch. -type WatchCache struct { +type watchCache struct { sync.RWMutex // Maximum size of history window. @@ -73,9 +69,9 @@ type WatchCache struct { // store will effectively support LIST operation from the "end of cache // history" i.e. from the moment just after the newest cached watched event. // It is necessary to effectively allow clients to start watching at now. - store Store + store cache.Store - // ResourceVersion up to which the WatchCache is propagated. + // ResourceVersion up to which the watchCache is propagated. resourceVersion uint64 // This handler is run at the end of every successful Replace() method. @@ -83,21 +79,21 @@ type WatchCache struct { // This handler is run at the end of every Add/Update/Delete method // and additionally gets the previous value of the object. - onEvent func(WatchCacheEvent) + onEvent func(watchCacheEvent) } -func NewWatchCache(capacity int) *WatchCache { - return &WatchCache{ +func newWatchCache(capacity int) *watchCache { + return &watchCache{ capacity: capacity, cache: make([]watchCacheElement, capacity), startIndex: 0, endIndex: 0, - store: NewStore(MetaNamespaceKeyFunc), + store: cache.NewStore(cache.MetaNamespaceKeyFunc), resourceVersion: 0, } } -func (w *WatchCache) Add(obj interface{}) error { +func (w *watchCache) Add(obj interface{}) error { object, resourceVersion, err := objectToVersionedRuntimeObject(obj) if err != nil { return err @@ -108,7 +104,7 @@ func (w *WatchCache) Add(obj interface{}) error { return w.processEvent(event, resourceVersion, f) } -func (w *WatchCache) Update(obj interface{}) error { +func (w *watchCache) Update(obj interface{}) error { object, resourceVersion, err := objectToVersionedRuntimeObject(obj) if err != nil { return err @@ -119,7 +115,7 @@ func (w *WatchCache) Update(obj interface{}) error { return w.processEvent(event, resourceVersion, f) } -func (w *WatchCache) Delete(obj interface{}) error { +func (w *watchCache) Delete(obj interface{}) error { object, resourceVersion, err := objectToVersionedRuntimeObject(obj) if err != nil { return err @@ -153,7 +149,7 @@ func parseResourceVersion(resourceVersion string) (uint64, error) { return strconv.ParseUint(resourceVersion, 10, 64) } -func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error { +func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error { w.Lock() defer w.Unlock() previous, exists, err := w.store.Get(event.Object) @@ -166,7 +162,7 @@ func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, upd } else { prevObject = nil } - watchCacheEvent := WatchCacheEvent{event.Type, event.Object, prevObject} + watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject} if w.onEvent != nil { w.onEvent(watchCacheEvent) } @@ -176,7 +172,7 @@ func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, upd } // Assumes that lock is already held for write. -func (w *WatchCache) updateCache(resourceVersion uint64, event WatchCacheEvent) { +func (w *watchCache) updateCache(resourceVersion uint64, event watchCacheEvent) { if w.endIndex == w.startIndex+w.capacity { // Cache is full - remove the oldest element. w.startIndex++ @@ -185,41 +181,37 @@ func (w *WatchCache) updateCache(resourceVersion uint64, event WatchCacheEvent) w.endIndex++ } -func (w *WatchCache) List() []interface{} { +func (w *watchCache) List() []interface{} { w.RLock() defer w.RUnlock() return w.store.List() } -func (w *WatchCache) ListWithVersion() ([]interface{}, uint64) { +func (w *watchCache) ListWithVersion() ([]interface{}, uint64) { w.RLock() defer w.RUnlock() return w.store.List(), w.resourceVersion } -func (w *WatchCache) ListKeys() []string { +func (w *watchCache) ListKeys() []string { w.RLock() defer w.RUnlock() return w.store.ListKeys() } -func (w *WatchCache) Get(obj interface{}) (interface{}, bool, error) { +func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) { w.RLock() defer w.RUnlock() return w.store.Get(obj) } -func (w *WatchCache) GetByKey(key string) (interface{}, bool, error) { +func (w *watchCache) GetByKey(key string) (interface{}, bool, error) { w.RLock() defer w.RUnlock() return w.store.GetByKey(key) } -func (w *WatchCache) Replace(objs []interface{}) error { - return w.ReplaceWithVersion(objs, "0") -} - -func (w *WatchCache) ReplaceWithVersion(objs []interface{}, resourceVersion string) error { +func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error { version, err := parseResourceVersion(resourceVersion) if err != nil { return err @@ -230,7 +222,7 @@ func (w *WatchCache) ReplaceWithVersion(objs []interface{}, resourceVersion stri w.startIndex = 0 w.endIndex = 0 - if err := w.store.Replace(objs); err != nil { + if err := w.store.Replace(objs, resourceVersion); err != nil { return err } w.resourceVersion = version @@ -240,19 +232,19 @@ func (w *WatchCache) ReplaceWithVersion(objs []interface{}, resourceVersion stri return nil } -func (w *WatchCache) SetOnReplace(onReplace func()) { +func (w *watchCache) SetOnReplace(onReplace func()) { w.Lock() defer w.Unlock() w.onReplace = onReplace } -func (w *WatchCache) SetOnEvent(onEvent func(WatchCacheEvent)) { +func (w *watchCache) SetOnEvent(onEvent func(watchCacheEvent)) { w.Lock() defer w.Unlock() w.onEvent = onEvent } -func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]WatchCacheEvent, error) { +func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]watchCacheEvent, error) { size := w.endIndex - w.startIndex oldest := w.resourceVersion if size > 0 { @@ -268,14 +260,14 @@ func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]Wa return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion } first := sort.Search(size, f) - result := make([]WatchCacheEvent, size-first) + result := make([]watchCacheEvent, size-first) for i := 0; i < size-first; i++ { result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent } return result, nil } -func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]WatchCacheEvent, error) { +func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]watchCacheEvent, error) { w.RLock() defer w.RUnlock() return w.GetAllEventsSinceThreadUnsafe(resourceVersion) diff --git a/pkg/client/unversioned/cache/watch_cache_test.go b/pkg/storage/watch_cache_test.go similarity index 82% rename from pkg/client/unversioned/cache/watch_cache_test.go rename to pkg/storage/watch_cache_test.go index 8f10c197cf4..f94e35d94f4 100644 --- a/pkg/client/unversioned/cache/watch_cache_test.go +++ b/pkg/storage/watch_cache_test.go @@ -14,13 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package cache +package storage import ( "strconv" "testing" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/unversioned/cache" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/watch" ) @@ -36,7 +38,7 @@ func makeTestPod(name string, resourceVersion uint64) *api.Pod { } func TestWatchCacheBasic(t *testing.T) { - store := NewWatchCache(2) + store := newWatchCache(2) // Test Add/Update/Delete. pod1 := makeTestPod("pod", 1) @@ -90,7 +92,7 @@ func TestWatchCacheBasic(t *testing.T) { store.Replace([]interface{}{ makeTestPod("pod4", 7), makeTestPod("pod5", 8), - }) + }, "8") { podNames := util.StringSet{} for _, item := range store.List() { @@ -106,7 +108,7 @@ func TestWatchCacheBasic(t *testing.T) { } func TestEvents(t *testing.T) { - store := NewWatchCache(5) + store := newWatchCache(5) store.Add(makeTestPod("pod", 2)) @@ -221,3 +223,44 @@ func TestEvents(t *testing.T) { } } } + +type testLW struct { + ListFunc func() (runtime.Object, error) + WatchFunc func(resourceVersion string) (watch.Interface, error) +} + +func (t *testLW) List() (runtime.Object, error) { return t.ListFunc() } +func (t *testLW) Watch(resourceVersion string) (watch.Interface, error) { + return t.WatchFunc(resourceVersion) +} + +func TestReflectorForWatchCache(t *testing.T) { + store := newWatchCache(5) + + { + _, version := store.ListWithVersion() + if version != 0 { + t.Errorf("unexpected resource version: %d", version) + } + } + + lw := &testLW{ + WatchFunc: func(rv string) (watch.Interface, error) { + fw := watch.NewFake() + go fw.Stop() + return fw, nil + }, + ListFunc: func() (runtime.Object, error) { + return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "10"}}, nil + }, + } + r := cache.NewReflector(lw, &api.Pod{}, store, 0) + r.ListAndWatch(util.NeverStop) + + { + _, version := store.ListWithVersion() + if version != 10 { + t.Errorf("unexpected resource version: %d", version) + } + } +} diff --git a/test/e2e/daemon_restart.go b/test/e2e/daemon_restart.go index dbea4af1365..57f8b2d0fa0 100644 --- a/test/e2e/daemon_restart.go +++ b/test/e2e/daemon_restart.go @@ -131,7 +131,7 @@ func replacePods(pods []*api.Pod, store cache.Store) { for i := range pods { found = append(found, pods[i]) } - expectNoError(store.Replace(found)) + expectNoError(store.Replace(found, "0")) } // getContainerRestarts returns the count of container restarts across all pods matching the given labelSelector,