From 4c4ca590503f39cdd6d09054c454b52f17441f8c Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 18 Aug 2014 14:47:20 -0700 Subject: [PATCH] Add poller to cache. --- pkg/client/cache/fifo.go | 35 ++++++--- pkg/client/cache/poller.go | 81 ++++++++++++++++++++ pkg/client/cache/poller_test.go | 119 +++++++++++++++++++++++++++++ pkg/client/cache/reflector.go | 32 +++----- pkg/client/cache/reflector_test.go | 10 +-- pkg/client/cache/store.go | 48 +++++++++--- pkg/client/cache/store_test.go | 13 +++- 7 files changed, 290 insertions(+), 48 deletions(-) create mode 100644 pkg/client/cache/poller.go create mode 100644 pkg/client/cache/poller_test.go diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go index 8189305812a..07463e72c35 100644 --- a/pkg/client/cache/fifo.go +++ b/pkg/client/cache/fifo.go @@ -18,6 +18,8 @@ package cache import ( "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) // FIFO receives adds and updates from a Reflector, and puts them in a queue for @@ -33,30 +35,30 @@ type FIFO struct { } // Add inserts an item, and puts it in the queue. -func (f *FIFO) Add(ID string, obj interface{}) { +func (f *FIFO) Add(id string, obj interface{}) { f.lock.Lock() defer f.lock.Unlock() - f.items[ID] = obj - f.queue = append(f.queue, ID) + f.items[id] = obj + f.queue = append(f.queue, id) f.cond.Broadcast() } // Update updates an item, and adds it to the queue. -func (f *FIFO) Update(ID string, obj interface{}) { +func (f *FIFO) Update(id string, obj interface{}) { f.lock.Lock() defer f.lock.Unlock() - f.items[ID] = obj - f.queue = append(f.queue, ID) + f.items[id] = obj + f.queue = append(f.queue, id) f.cond.Broadcast() } // Delete removes an item. It doesn't add it to the queue, because // this implementation assumes the consumer only cares about the objects, // not the order in which they were created/added. -func (f *FIFO) Delete(ID string, obj interface{}) { +func (f *FIFO) Delete(id string) { f.lock.Lock() defer f.lock.Unlock() - delete(f.items, ID) + delete(f.items, id) } // List returns a list of all the items. @@ -70,11 +72,24 @@ func (f *FIFO) List() []interface{} { return list } +// Contains returns a util.StringSet containing all IDs of stored the items. +// This is a snapshot of a moment in time, and one should keep in mind that +// other go routines can add or remove items after you call this. +func (c *FIFO) Contains() util.StringSet { + c.lock.RLock() + defer c.lock.RUnlock() + set := util.StringSet{} + for id := range c.items { + set.Insert(id) + } + return set +} + // Get returns the requested item, or sets exists=false. -func (f *FIFO) Get(ID string) (item interface{}, exists bool) { +func (f *FIFO) Get(id string) (item interface{}, exists bool) { f.lock.RLock() defer f.lock.RUnlock() - item, exists = f.items[ID] + item, exists = f.items[id] return item, exists } diff --git a/pkg/client/cache/poller.go b/pkg/client/cache/poller.go new file mode 100644 index 00000000000..d2c379b7868 --- /dev/null +++ b/pkg/client/cache/poller.go @@ -0,0 +1,81 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" +) + +// Enumerator should be able to return the list of objects to be synced with +// one object at a time. +type Enumerator interface { + Len() int + Get(index int) (ID string, object interface{}) +} + +// GetFunc should return an enumerator that you wish the Poller to proccess. +type GetFunc func() (Enumerator, error) + +// Poller is like Reflector, but it periodically polls instead of watching. +// This is intended to be a workaround for api objects that don't yet support +// watching. +type Poller struct { + getFunc GetFunc + period time.Duration + store Store +} + +// NewPoller constructs a new poller. Note that polling probably doesn't make much +// sense to use along with the FIFO queue. The returned Poller will call getFunc and +// sync the objects in 'store' with the returned Enumerator, waiting 'period' between +// each call. It probably only makes sense to use a poller if you're treating the +// store as read-only. +func NewPoller(getFunc GetFunc, period time.Duration, store Store) *Poller { + return &Poller{ + getFunc: getFunc, + period: period, + store: store, + } +} + +// Run begins polling. It starts a goroutine and returns immediately. +func (p *Poller) Run() { + go util.Forever(func() { + e, err := p.getFunc() + if err != nil { + glog.Errorf("failed to list: %v", err) + return + } + p.sync(e) + }, p.period) +} + +func (p *Poller) sync(e Enumerator) { + current := p.store.Contains() + for i := 0; i < e.Len(); i++ { + id, object := e.Get(i) + p.store.Update(id, object) + current.Delete(id) + } + // Delete all the objects not found. + for id := range current { + p.store.Delete(id) + } +} diff --git a/pkg/client/cache/poller_test.go b/pkg/client/cache/poller_test.go new file mode 100644 index 00000000000..5ebfc21f260 --- /dev/null +++ b/pkg/client/cache/poller_test.go @@ -0,0 +1,119 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "errors" + "reflect" + "testing" + "time" +) + +type testPair struct { + id string + obj interface{} +} +type testEnumerator []testPair + +func (t testEnumerator) Len() int { return len(t) } +func (t testEnumerator) Get(i int) (string, interface{}) { + return t[i].id, t[i].obj +} + +func TestPoller_sync(t *testing.T) { + table := []struct { + // each step simulates the list that a getFunc would receive. + steps [][]testPair + }{ + { + steps: [][]testPair{ + { + {"foo", "foo1"}, + {"bar", "bar1"}, + {"baz", "baz1"}, + {"qux", "qux1"}, + }, { + {"foo", "foo2"}, + {"bar", "bar2"}, + {"qux", "qux2"}, + }, { + {"bar", "bar3"}, + {"baz", "baz2"}, + {"qux", "qux3"}, + }, { + {"qux", "qux4"}, + }, { + {"foo", "foo3"}, + }, + }, + }, + } + + for testCase, item := range table { + s := NewStore() + // This is a unit test for the sync function, hence the nil getFunc. + p := NewPoller(nil, 0, s) + for line, pairs := range item.steps { + p.sync(testEnumerator(pairs)) + + ids := s.Contains() + for _, pair := range pairs { + if !ids.Has(pair.id) { + t.Errorf("%v, %v: expected to find entry for %v, but did not.", testCase, line, pair.id) + continue + } + found, ok := s.Get(pair.id) + if !ok { + t.Errorf("%v, %v: unexpected absent entry for %v", testCase, line, pair.id) + continue + } + if e, a := pair.obj, found; !reflect.DeepEqual(e, a) { + t.Errorf("%v, %v: expected %v, got %v for %v", testCase, line, e, a, pair.id) + } + } + if e, a := len(pairs), len(ids); e != a { + t.Errorf("%v, %v: expected len %v, got %v", testCase, line, e, a) + } + } + } +} + +func TestPoller_Run(t *testing.T) { + s := NewStore() + const count = 10 + var called = 0 + done := make(chan struct{}) + NewPoller(func() (Enumerator, error) { + called++ + if called == count { + close(done) + } + // test both error and regular returns. + if called&1 == 0 { + return testEnumerator{}, nil + } + return nil, errors.New("transient error") + }, time.Millisecond, s).Run() + + // The test here is that we get called at least count times. + <-done + + // We never added anything, verify that. + if e, a := 0, len(s.Contains()); e != a { + t.Errorf("expected %v, got %v", e, a) + } +} diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 14b97903b47..4df55155ab7 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -26,29 +26,17 @@ import ( "github.com/golang/glog" ) -// Store is a generic object storage interface. Reflector knows how to watch a server -// and update a store. A generic store is provided, which allows Reflector to be used -// as a local caching system, and an LRU store, which allows Reflector to work like a -// queue of items yet to be processed. -type Store interface { - Add(ID string, obj interface{}) - Update(ID string, obj interface{}) - Delete(ID string, obj interface{}) - List() []interface{} - Get(ID string) (item interface{}, exists bool) -} - // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { // The type of object we expect to place in the store. expectedType reflect.Type // The destination to sync up with the watch source store Store - // watchCreater is called to initiate watches. + // watchFactory is called to initiate watches. watchFactory WatchFactory - // loopDelay controls timing between one watch ending and + // period controls timing between one watch ending and // the beginning of the next one. - loopDelay time.Duration + period time.Duration } // WatchFactory should begin a watch at the specified version. @@ -62,7 +50,7 @@ func NewReflector(watchFactory WatchFactory, expectedType interface{}, store Sto watchFactory: watchFactory, store: store, expectedType: reflect.TypeOf(expectedType), - loopDelay: time.Second, + period: time.Second, } return gc } @@ -78,7 +66,7 @@ func (gc *Reflector) Run() { return } gc.watchHandler(w, &resourceVersion) - }, gc.loopDelay) + }, gc.period) } // watchHandler watches w and keeps *resourceVersion up to date. @@ -104,13 +92,13 @@ func (gc *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) { case watch.Modified: gc.store.Update(jsonBase.ID(), event.Object) case watch.Deleted: - gc.store.Delete(jsonBase.ID(), event.Object) + // TODO: Will any consumers need access to the "last known + // state", which is passed in event.Object? If so, may need + // to change this. + gc.store.Delete(jsonBase.ID()) default: glog.Errorf("unable to understand watch event %#v", event) } - next := jsonBase.ResourceVersion() + 1 - if next > *resourceVersion { - *resourceVersion = next - } + *resourceVersion = jsonBase.ResourceVersion() + 1 } } diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index fa2b702a64a..89e5b7386da 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -30,10 +30,10 @@ func TestReflector_watchHandler(t *testing.T) { s.Add("foo", &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) s.Add("bar", &api.Pod{JSONBase: api.JSONBase{ID: "bar"}}) go func() { - fw.Modify(&api.Pod{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 55}}) - fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz", ResourceVersion: 32}}) fw.Add(&api.Service{JSONBase: api.JSONBase{ID: "rejected"}}) fw.Delete(&api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) + fw.Modify(&api.Pod{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 55}}) + fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz", ResourceVersion: 32}}) fw.Stop() }() var resumeRV uint64 @@ -62,8 +62,8 @@ func TestReflector_watchHandler(t *testing.T) { } } - // RV should stay 1 higher than the highest id we see. - if e, a := uint64(56), resumeRV; e != a { + // RV should stay 1 higher than the last id we see. + if e, a := uint64(33), resumeRV; e != a { t.Errorf("expected %v, got %v", e, a) } } @@ -87,7 +87,7 @@ func TestReflector_Run(t *testing.T) { } s := NewFIFO() r := NewReflector(watchStarter, &api.Pod{}, s) - r.loopDelay = 0 + r.period = 0 r.Run() ids := []string{"foo", "bar", "baz", "qux", "zoo"} diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 016aa18ce84..4b27aa29099 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -18,32 +18,47 @@ package cache import ( "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) +// Store is a generic object storage interface. Reflector knows how to watch a server +// and update a store. A generic store is provided, which allows Reflector to be used +// as a local caching system, and an LRU store, which allows Reflector to work like a +// queue of items yet to be processed. +type Store interface { + Add(id string, obj interface{}) + Update(id string, obj interface{}) + Delete(id string) + List() []interface{} + Contains() util.StringSet + Get(id string) (item interface{}, exists bool) +} + type cache struct { lock sync.RWMutex items map[string]interface{} } // Add inserts an item into the cache. -func (c *cache) Add(ID string, obj interface{}) { +func (c *cache) Add(id string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() - c.items[ID] = obj + c.items[id] = obj } // Update sets an item in the cache to its updated state. -func (c *cache) Update(ID string, obj interface{}) { +func (c *cache) Update(id string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() - c.items[ID] = obj + c.items[id] = obj } // Delete removes an item from the cache. -func (c *cache) Delete(ID string, obj interface{}) { +func (c *cache) Delete(id string) { c.lock.Lock() defer c.lock.Unlock() - delete(c.items, ID) + delete(c.items, id) } // List returns a list of all the items. @@ -58,12 +73,25 @@ func (c *cache) List() []interface{} { return list } -// Get returns the requested item, or sets exists=false. -// Get is completely threadsafe as long as you treat all items as immutable. -func (c *cache) Get(ID string) (item interface{}, exists bool) { +// Contains returns a util.StringSet containing all IDs of stored the items. +// This is a snapshot of a moment in time, and one should keep in mind that +// other go routines can add or remove items after you call this. +func (c *cache) Contains() util.StringSet { c.lock.RLock() defer c.lock.RUnlock() - item, exists = c.items[ID] + set := util.StringSet{} + for id := range c.items { + set.Insert(id) + } + return set +} + +// Get returns the requested item, or sets exists=false. +// Get is completely threadsafe as long as you treat all items as immutable. +func (c *cache) Get(id string) (item interface{}, exists bool) { + c.lock.RLock() + defer c.lock.RUnlock() + item, exists = c.items[id] return item, exists } diff --git a/pkg/client/cache/store_test.go b/pkg/client/cache/store_test.go index 9154488c31a..4220d284f4a 100644 --- a/pkg/client/cache/store_test.go +++ b/pkg/client/cache/store_test.go @@ -40,10 +40,12 @@ func doTestStore(t *testing.T, store Store) { t.Errorf("expected %v, got %v", e, a) } } - store.Delete("foo", "qux") + store.Delete("foo") if _, ok := store.Get("foo"); ok { t.Errorf("found deleted item??") } + + // Test List store.Add("a", "b") store.Add("c", "d") store.Add("e", "e") @@ -57,6 +59,15 @@ func doTestStore(t *testing.T, store Store) { if len(found) != 3 { t.Errorf("extra items") } + + // Check that ID list is correct. + ids := store.Contains() + if !ids.HasAll("a", "c", "e") { + t.Errorf("missing items") + } + if len(ids) != 3 { + t.Errorf("extra items") + } } func TestCache(t *testing.T) {