From e202f9c7970c3ad9caaa9cad48295d36c2d14c1e Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 18 Aug 2015 10:34:27 +0200 Subject: [PATCH] Add resource version to Store Replace params. --- contrib/mesos/pkg/queue/historical.go | 2 +- contrib/mesos/pkg/queue/historical_test.go | 2 +- contrib/mesos/pkg/scheduler/plugin.go | 4 +- pkg/client/unversioned/cache/delta_fifo.go | 2 +- .../unversioned/cache/delta_fifo_test.go | 6 +- .../unversioned/cache/expiration_cache.go | 4 +- pkg/client/unversioned/cache/fifo.go | 2 +- pkg/client/unversioned/cache/fifo_test.go | 2 +- pkg/client/unversioned/cache/poller.go | 86 ------------ pkg/client/unversioned/cache/poller_test.go | 132 ------------------ pkg/client/unversioned/cache/reflector.go | 7 +- pkg/client/unversioned/cache/store.go | 6 +- pkg/client/unversioned/cache/store_test.go | 2 +- .../unversioned/cache/thread_safe_store.go | 4 +- pkg/client/unversioned/cache/undelta_store.go | 4 +- .../unversioned/cache/undelta_store_test.go | 2 +- pkg/client/unversioned/cache/watch_cache.go | 8 +- .../unversioned/cache/watch_cache_test.go | 2 +- .../replication_controller_test.go | 2 +- test/e2e/daemon_restart.go | 2 +- 20 files changed, 27 insertions(+), 254 deletions(-) delete mode 100644 pkg/client/unversioned/cache/poller.go delete mode 100644 pkg/client/unversioned/cache/poller_test.go diff --git a/contrib/mesos/pkg/queue/historical.go b/contrib/mesos/pkg/queue/historical.go index 492cb2a9b69..6fe65f21e2b 100644 --- a/contrib/mesos/pkg/queue/historical.go +++ b/contrib/mesos/pkg/queue/historical.go @@ -277,7 +277,7 @@ func (f *HistoricalFIFO) pop(cancel chan struct{}) interface{} { } } -func (f *HistoricalFIFO) Replace(objs []interface{}) error { +func (f *HistoricalFIFO) Replace(objs []interface{}, resourceVersion string) error { notifications := make([]Entry, 0, len(objs)) defer func() { for _, e := range notifications { diff --git a/contrib/mesos/pkg/queue/historical_test.go b/contrib/mesos/pkg/queue/historical_test.go index 4477601beda..587629be6df 100644 --- a/contrib/mesos/pkg/queue/historical_test.go +++ b/contrib/mesos/pkg/queue/historical_test.go @@ -122,7 +122,7 @@ func TestFIFO_addUpdate(t *testing.T) { func TestFIFO_addReplace(t *testing.T) { f := NewHistorical(nil) f.Add(&testObj{"foo", 10}) - f.Replace([]interface{}{&testObj{"foo", 15}}) + f.Replace([]interface{}{&testObj{"foo", 15}}, "0") got := make(chan *testObj, 2) go func() { for { diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index f6c400faeb7..56cc419abf6 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -893,11 +893,11 @@ func (psa *podStoreAdapter) Get(obj interface{}) (interface{}, bool, error) { // Replace will delete the contents of the store, using instead the // given map. This store implementation does NOT take ownership of the map. -func (psa *podStoreAdapter) Replace(objs []interface{}) error { +func (psa *podStoreAdapter) Replace(objs []interface{}, resourceVersion string) error { newobjs := make([]interface{}, len(objs)) for i, v := range objs { pod := v.(*api.Pod) newobjs[i] = &Pod{Pod: pod} } - return psa.FIFO.Replace(newobjs) + return psa.FIFO.Replace(newobjs, resourceVersion) } diff --git a/pkg/client/unversioned/cache/delta_fifo.go b/pkg/client/unversioned/cache/delta_fifo.go index 96315b8d590..4b432208ba4 100644 --- a/pkg/client/unversioned/cache/delta_fifo.go +++ b/pkg/client/unversioned/cache/delta_fifo.go @@ -307,7 +307,7 @@ func (f *DeltaFIFO) Pop() interface{} { // 'f' takes ownership of the map, you should not reference the map again // after calling this function. f's queue is reset, too; upon return, it // will contain the items in the map, in no particular order. -func (f *DeltaFIFO) Replace(list []interface{}) error { +func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { f.lock.Lock() defer f.lock.Unlock() for _, item := range list { diff --git a/pkg/client/unversioned/cache/delta_fifo_test.go b/pkg/client/unversioned/cache/delta_fifo_test.go index 24b2e7739fc..1b2288ab006 100644 --- a/pkg/client/unversioned/cache/delta_fifo_test.go +++ b/pkg/client/unversioned/cache/delta_fifo_test.go @@ -97,7 +97,7 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) { ) f.Add(mkFifoObj("foo", 10)) f.Update(mkFifoObj("foo", 12)) - f.Replace([]interface{}{mkFifoObj("foo", 20)}) + f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0") f.Delete(mkFifoObj("foo", 15)) f.Delete(mkFifoObj("foo", 18)) // flush the last one out expect := []DeltaType{Added, Updated, Sync, Deleted} @@ -165,7 +165,7 @@ func TestDeltaFIFO_enqueueing(t *testing.T) { func TestDeltaFIFO_addReplace(t *testing.T) { f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) f.Add(mkFifoObj("foo", 10)) - f.Replace([]interface{}{mkFifoObj("foo", 15)}) + f.Replace([]interface{}{mkFifoObj("foo", 15)}, "0") got := make(chan testFifoObject, 2) go func() { for { @@ -197,7 +197,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { }), ) f.Delete(mkFifoObj("baz", 10)) - f.Replace([]interface{}{mkFifoObj("foo", 5)}) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") expectedList := []Deltas{ {{Deleted, mkFifoObj("baz", 10)}}, diff --git a/pkg/client/unversioned/cache/expiration_cache.go b/pkg/client/unversioned/cache/expiration_cache.go index 077a38755dc..e6d67c14274 100644 --- a/pkg/client/unversioned/cache/expiration_cache.go +++ b/pkg/client/unversioned/cache/expiration_cache.go @@ -166,7 +166,7 @@ func (c *ExpirationCache) Delete(obj interface{}) error { // Replace will convert all items in the given list to TimestampedEntries // before attempting the replace operation. The replace operation will // delete the contents of the ExpirationCache `c`. -func (c *ExpirationCache) Replace(list []interface{}) error { +func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error { items := map[string]interface{}{} ts := c.clock.Now() for _, item := range list { @@ -176,7 +176,7 @@ func (c *ExpirationCache) Replace(list []interface{}) error { } items[key] = ×tampedEntry{item, ts} } - c.cacheStorage.Replace(items) + c.cacheStorage.Replace(items, resourceVersion) return nil } diff --git a/pkg/client/unversioned/cache/fifo.go b/pkg/client/unversioned/cache/fifo.go index 45e5f669411..b56687ee0fd 100644 --- a/pkg/client/unversioned/cache/fifo.go +++ b/pkg/client/unversioned/cache/fifo.go @@ -188,7 +188,7 @@ func (f *FIFO) Pop() interface{} { // 'f' takes ownership of the map, you should not reference the map again // after calling this function. f's queue is reset, too; upon return, it // will contain the items in the map, in no particular order. -func (f *FIFO) Replace(list []interface{}) error { +func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { items := map[string]interface{}{} for _, item := range list { key, err := f.keyFunc(item) diff --git a/pkg/client/unversioned/cache/fifo_test.go b/pkg/client/unversioned/cache/fifo_test.go index cc6f27b7940..bf99b5aa152 100644 --- a/pkg/client/unversioned/cache/fifo_test.go +++ b/pkg/client/unversioned/cache/fifo_test.go @@ -107,7 +107,7 @@ func TestFIFO_addUpdate(t *testing.T) { func TestFIFO_addReplace(t *testing.T) { f := NewFIFO(testFifoObjectKeyFunc) f.Add(mkFifoObj("foo", 10)) - f.Replace([]interface{}{mkFifoObj("foo", 15)}) + f.Replace([]interface{}{mkFifoObj("foo", 15)}, "15") got := make(chan testFifoObject, 2) go func() { for { diff --git a/pkg/client/unversioned/cache/poller.go b/pkg/client/unversioned/cache/poller.go deleted file mode 100644 index ef73ae29f31..00000000000 --- a/pkg/client/unversioned/cache/poller.go +++ /dev/null @@ -1,86 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cache - -import ( - "time" - - "github.com/golang/glog" - "k8s.io/kubernetes/pkg/util" -) - -// Enumerator should be able to return the list of objects to be synced with -// one object at a time. -type Enumerator interface { - Len() int - Get(index int) (object interface{}) -} - -// GetFunc should return an enumerator that you wish the Poller to process. -type GetFunc func() (Enumerator, error) - -// Poller is like Reflector, but it periodically polls instead of watching. -// This is intended to be a workaround for api objects that don't yet support -// watching. -type Poller struct { - getFunc GetFunc - period time.Duration - store Store -} - -// NewPoller constructs a new poller. Note that polling probably doesn't make much -// sense to use along with the FIFO queue. The returned Poller will call getFunc and -// sync the objects in 'store' with the returned Enumerator, waiting 'period' between -// each call. It probably only makes sense to use a poller if you're treating the -// store as read-only. -func NewPoller(getFunc GetFunc, period time.Duration, store Store) *Poller { - return &Poller{ - getFunc: getFunc, - period: period, - store: store, - } -} - -// Run begins polling. It starts a goroutine and returns immediately. -func (p *Poller) Run() { - go util.Until(p.run, p.period, util.NeverStop) -} - -// RunUntil begins polling. It starts a goroutine and returns immediately. -// It will stop when the stopCh is closed. -func (p *Poller) RunUntil(stopCh <-chan struct{}) { - go util.Until(p.run, p.period, stopCh) -} - -func (p *Poller) run() { - e, err := p.getFunc() - if err != nil { - glog.Errorf("failed to list: %v", err) - return - } - p.sync(e) -} - -func (p *Poller) sync(e Enumerator) { - items := []interface{}{} - for i := 0; i < e.Len(); i++ { - object := e.Get(i) - items = append(items, object) - } - - p.store.Replace(items) -} diff --git a/pkg/client/unversioned/cache/poller_test.go b/pkg/client/unversioned/cache/poller_test.go deleted file mode 100644 index 2d3951d8cbd..00000000000 --- a/pkg/client/unversioned/cache/poller_test.go +++ /dev/null @@ -1,132 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cache - -import ( - "errors" - "reflect" - "testing" - "time" -) - -func testPairKeyFunc(obj interface{}) (string, error) { - return obj.(testPair).id, nil -} - -type testPair struct { - id string - obj interface{} -} -type testEnumerator []testPair - -func (t testEnumerator) Len() int { return len(t) } -func (t testEnumerator) Get(i int) interface{} { - return t[i] -} - -func TestPoller_sync(t *testing.T) { - table := []struct { - // each step simulates the list that a getFunc would receive. - steps [][]testPair - }{ - { - steps: [][]testPair{ - { - {"foo", "foo1"}, - {"bar", "bar1"}, - {"baz", "baz1"}, - {"qux", "qux1"}, - }, { - {"foo", "foo2"}, - {"bar", "bar2"}, - {"qux", "qux2"}, - }, { - {"bar", "bar3"}, - {"baz", "baz2"}, - {"qux", "qux3"}, - }, { - {"qux", "qux4"}, - }, { - {"foo", "foo3"}, - }, - }, - }, - } - - for testCase, item := range table { - s := NewStore(testPairKeyFunc) - // This is a unit test for the sync function, hence the nil getFunc. - p := NewPoller(nil, 0, s) - for line, pairs := range item.steps { - p.sync(testEnumerator(pairs)) - - list := s.List() - for _, pair := range pairs { - foundInList := false - for _, listItem := range list { - id, _ := testPairKeyFunc(listItem) - if pair.id == id { - foundInList = true - } - } - if !foundInList { - t.Errorf("%v, %v: expected to find list entry for %v, but did not.", testCase, line, pair.id) - continue - } - found, ok, _ := s.Get(pair) - if !ok { - t.Errorf("%v, %v: unexpected absent entry for %v", testCase, line, pair.id) - continue - } - if e, a := pair.obj, found.(testPair).obj; !reflect.DeepEqual(e, a) { - t.Errorf("%v, %v: expected %v, got %v for %v", testCase, line, e, a, pair.id) - } - } - if e, a := len(pairs), len(list); e != a { - t.Errorf("%v, %v: expected len %v, got %v", testCase, line, e, a) - } - } - } -} - -func TestPoller_Run(t *testing.T) { - stopCh := make(chan struct{}) - defer func() { stopCh <- struct{}{} }() - s := NewStore(testPairKeyFunc) - const count = 10 - var called = 0 - done := make(chan struct{}) - NewPoller(func() (Enumerator, error) { - called++ - if called == count { - close(done) - } - // test both error and regular returns. - if called&1 == 0 { - return testEnumerator{}, nil - } - return nil, errors.New("transient error") - }, time.Millisecond, s).RunUntil(stopCh) - - // The test here is that we get called at least count times. - <-done - - // We never added anything, verify that. - if e, a := 0, len(s.List()); e != a { - t.Errorf("expected %v, got %v", e, a) - } -} diff --git a/pkg/client/unversioned/cache/reflector.go b/pkg/client/unversioned/cache/reflector.go index 85a5ff58ebb..4c2d664205a 100644 --- a/pkg/client/unversioned/cache/reflector.go +++ b/pkg/client/unversioned/cache/reflector.go @@ -234,12 +234,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err for _, item := range items { found = append(found, item) } - - myStore, ok := r.store.(*WatchCache) - if ok { - return myStore.ReplaceWithVersion(found, resourceVersion) - } - return r.store.Replace(found) + return r.store.Replace(found, resourceVersion) } // watchHandler watches w and keeps *resourceVersion up to date. diff --git a/pkg/client/unversioned/cache/store.go b/pkg/client/unversioned/cache/store.go index 3972db9b9c8..ce16f9c8b74 100644 --- a/pkg/client/unversioned/cache/store.go +++ b/pkg/client/unversioned/cache/store.go @@ -43,7 +43,7 @@ type Store interface { // Replace will delete the contents of the store, using instead the // given list. Store takes ownership of the list, you should not reference // it after calling this function. - Replace([]interface{}) error + Replace([]interface{}, string) error } // KeyFunc knows how to make a key from an object. Implementations should be deterministic. @@ -193,7 +193,7 @@ func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) // Replace will delete the contents of 'c', using instead the given list. // 'c' takes ownership of the list, you should not reference the list again // after calling this function. -func (c *cache) Replace(list []interface{}) error { +func (c *cache) Replace(list []interface{}, resourceVersion string) error { items := map[string]interface{}{} for _, item := range list { key, err := c.keyFunc(item) @@ -202,7 +202,7 @@ func (c *cache) Replace(list []interface{}) error { } items[key] = item } - c.cacheStorage.Replace(items) + c.cacheStorage.Replace(items, resourceVersion) return nil } diff --git a/pkg/client/unversioned/cache/store_test.go b/pkg/client/unversioned/cache/store_test.go index f3d2e2fb73c..2d3b153af7a 100644 --- a/pkg/client/unversioned/cache/store_test.go +++ b/pkg/client/unversioned/cache/store_test.go @@ -70,7 +70,7 @@ func doTestStore(t *testing.T, store Store) { store.Replace([]interface{}{ mkObj("foo", "foo"), mkObj("bar", "bar"), - }) + }, "0") { found := util.StringSet{} diff --git a/pkg/client/unversioned/cache/thread_safe_store.go b/pkg/client/unversioned/cache/thread_safe_store.go index 5480f39a9ca..20113937890 100644 --- a/pkg/client/unversioned/cache/thread_safe_store.go +++ b/pkg/client/unversioned/cache/thread_safe_store.go @@ -41,7 +41,7 @@ type ThreadSafeStore interface { Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string - Replace(map[string]interface{}) + Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) @@ -112,7 +112,7 @@ func (c *threadSafeMap) ListKeys() []string { return list } -func (c *threadSafeMap) Replace(items map[string]interface{}) { +func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) { c.lock.Lock() defer c.lock.Unlock() c.items = items diff --git a/pkg/client/unversioned/cache/undelta_store.go b/pkg/client/unversioned/cache/undelta_store.go index 5cd6378eaf8..4a8a4500e34 100644 --- a/pkg/client/unversioned/cache/undelta_store.go +++ b/pkg/client/unversioned/cache/undelta_store.go @@ -66,8 +66,8 @@ func (u *UndeltaStore) Delete(obj interface{}) error { return nil } -func (u *UndeltaStore) Replace(list []interface{}) error { - if err := u.Store.Replace(list); err != nil { +func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error { + if err := u.Store.Replace(list, resourceVersion); err != nil { return err } u.PushFunc(u.Store.List()) diff --git a/pkg/client/unversioned/cache/undelta_store_test.go b/pkg/client/unversioned/cache/undelta_store_test.go index d6aa3738a67..c14b7a80087 100644 --- a/pkg/client/unversioned/cache/undelta_store_test.go +++ b/pkg/client/unversioned/cache/undelta_store_test.go @@ -120,7 +120,7 @@ func TestReplaceCallsPush(t *testing.T) { m := []interface{}{mkObj("a", 1)} - u.Replace(m) + u.Replace(m, "0") if callcount != 1 { t.Errorf("Expected 1 calls, got %d", callcount) } diff --git a/pkg/client/unversioned/cache/watch_cache.go b/pkg/client/unversioned/cache/watch_cache.go index a40c83a7cd6..ae49f0f851f 100644 --- a/pkg/client/unversioned/cache/watch_cache.go +++ b/pkg/client/unversioned/cache/watch_cache.go @@ -215,11 +215,7 @@ func (w *WatchCache) GetByKey(key string) (interface{}, bool, error) { return w.store.GetByKey(key) } -func (w *WatchCache) Replace(objs []interface{}) error { - return w.ReplaceWithVersion(objs, "0") -} - -func (w *WatchCache) ReplaceWithVersion(objs []interface{}, resourceVersion string) error { +func (w *WatchCache) Replace(objs []interface{}, resourceVersion string) error { version, err := parseResourceVersion(resourceVersion) if err != nil { return err @@ -230,7 +226,7 @@ func (w *WatchCache) ReplaceWithVersion(objs []interface{}, resourceVersion stri w.startIndex = 0 w.endIndex = 0 - if err := w.store.Replace(objs); err != nil { + if err := w.store.Replace(objs, resourceVersion); err != nil { return err } w.resourceVersion = version diff --git a/pkg/client/unversioned/cache/watch_cache_test.go b/pkg/client/unversioned/cache/watch_cache_test.go index 8f10c197cf4..be827ba556f 100644 --- a/pkg/client/unversioned/cache/watch_cache_test.go +++ b/pkg/client/unversioned/cache/watch_cache_test.go @@ -90,7 +90,7 @@ func TestWatchCacheBasic(t *testing.T) { store.Replace([]interface{}{ makeTestPod("pod4", 7), makeTestPod("pod5", 8), - }) + }, "8") { podNames := util.StringSet{} for _, item := range store.List() { diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 456771d8c21..00942fdc109 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -908,7 +908,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { // This should have no effect, since we've deleted the rc. podExp.Seen(1, 0) - manager.podStore.Store.Replace(make([]interface{}, 0)) + manager.podStore.Store.Replace(make([]interface{}, 0), "0") manager.syncReplicationController(getKey(rc, t)) validateSyncReplication(t, &fakePodControl, 0, 0) } diff --git a/test/e2e/daemon_restart.go b/test/e2e/daemon_restart.go index dbea4af1365..57f8b2d0fa0 100644 --- a/test/e2e/daemon_restart.go +++ b/test/e2e/daemon_restart.go @@ -131,7 +131,7 @@ func replacePods(pods []*api.Pod, store cache.Store) { for i := range pods { found = append(found, pods[i]) } - expectNoError(store.Replace(found)) + expectNoError(store.Replace(found, "0")) } // getContainerRestarts returns the count of container restarts across all pods matching the given labelSelector,