Add poller to cache.

This commit is contained in:
Daniel Smith 2014-08-18 14:47:20 -07:00
parent dddad888b5
commit 4c4ca59050
7 changed files with 290 additions and 48 deletions

View File

@ -18,6 +18,8 @@ package cache
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// FIFO receives adds and updates from a Reflector, and puts them in a queue for
@ -33,30 +35,30 @@ type FIFO struct {
}
// Add inserts an item, and puts it in the queue.
func (f *FIFO) Add(ID string, obj interface{}) {
func (f *FIFO) Add(id string, obj interface{}) {
f.lock.Lock()
defer f.lock.Unlock()
f.items[ID] = obj
f.queue = append(f.queue, ID)
f.items[id] = obj
f.queue = append(f.queue, id)
f.cond.Broadcast()
}
// Update updates an item, and adds it to the queue.
func (f *FIFO) Update(ID string, obj interface{}) {
func (f *FIFO) Update(id string, obj interface{}) {
f.lock.Lock()
defer f.lock.Unlock()
f.items[ID] = obj
f.queue = append(f.queue, ID)
f.items[id] = obj
f.queue = append(f.queue, id)
f.cond.Broadcast()
}
// Delete removes an item. It doesn't add it to the queue, because
// this implementation assumes the consumer only cares about the objects,
// not the order in which they were created/added.
func (f *FIFO) Delete(ID string, obj interface{}) {
func (f *FIFO) Delete(id string) {
f.lock.Lock()
defer f.lock.Unlock()
delete(f.items, ID)
delete(f.items, id)
}
// List returns a list of all the items.
@ -70,11 +72,24 @@ func (f *FIFO) List() []interface{} {
return list
}
// Contains returns a util.StringSet containing all IDs of stored the items.
// This is a snapshot of a moment in time, and one should keep in mind that
// other go routines can add or remove items after you call this.
func (c *FIFO) Contains() util.StringSet {
c.lock.RLock()
defer c.lock.RUnlock()
set := util.StringSet{}
for id := range c.items {
set.Insert(id)
}
return set
}
// Get returns the requested item, or sets exists=false.
func (f *FIFO) Get(ID string) (item interface{}, exists bool) {
func (f *FIFO) Get(id string) (item interface{}, exists bool) {
f.lock.RLock()
defer f.lock.RUnlock()
item, exists = f.items[ID]
item, exists = f.items[id]
return item, exists
}

81
pkg/client/cache/poller.go vendored Normal file
View File

@ -0,0 +1,81 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
// Enumerator should be able to return the list of objects to be synced with
// one object at a time.
type Enumerator interface {
Len() int
Get(index int) (ID string, object interface{})
}
// GetFunc should return an enumerator that you wish the Poller to proccess.
type GetFunc func() (Enumerator, error)
// Poller is like Reflector, but it periodically polls instead of watching.
// This is intended to be a workaround for api objects that don't yet support
// watching.
type Poller struct {
getFunc GetFunc
period time.Duration
store Store
}
// NewPoller constructs a new poller. Note that polling probably doesn't make much
// sense to use along with the FIFO queue. The returned Poller will call getFunc and
// sync the objects in 'store' with the returned Enumerator, waiting 'period' between
// each call. It probably only makes sense to use a poller if you're treating the
// store as read-only.
func NewPoller(getFunc GetFunc, period time.Duration, store Store) *Poller {
return &Poller{
getFunc: getFunc,
period: period,
store: store,
}
}
// Run begins polling. It starts a goroutine and returns immediately.
func (p *Poller) Run() {
go util.Forever(func() {
e, err := p.getFunc()
if err != nil {
glog.Errorf("failed to list: %v", err)
return
}
p.sync(e)
}, p.period)
}
func (p *Poller) sync(e Enumerator) {
current := p.store.Contains()
for i := 0; i < e.Len(); i++ {
id, object := e.Get(i)
p.store.Update(id, object)
current.Delete(id)
}
// Delete all the objects not found.
for id := range current {
p.store.Delete(id)
}
}

119
pkg/client/cache/poller_test.go vendored Normal file
View File

@ -0,0 +1,119 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"errors"
"reflect"
"testing"
"time"
)
type testPair struct {
id string
obj interface{}
}
type testEnumerator []testPair
func (t testEnumerator) Len() int { return len(t) }
func (t testEnumerator) Get(i int) (string, interface{}) {
return t[i].id, t[i].obj
}
func TestPoller_sync(t *testing.T) {
table := []struct {
// each step simulates the list that a getFunc would receive.
steps [][]testPair
}{
{
steps: [][]testPair{
{
{"foo", "foo1"},
{"bar", "bar1"},
{"baz", "baz1"},
{"qux", "qux1"},
}, {
{"foo", "foo2"},
{"bar", "bar2"},
{"qux", "qux2"},
}, {
{"bar", "bar3"},
{"baz", "baz2"},
{"qux", "qux3"},
}, {
{"qux", "qux4"},
}, {
{"foo", "foo3"},
},
},
},
}
for testCase, item := range table {
s := NewStore()
// This is a unit test for the sync function, hence the nil getFunc.
p := NewPoller(nil, 0, s)
for line, pairs := range item.steps {
p.sync(testEnumerator(pairs))
ids := s.Contains()
for _, pair := range pairs {
if !ids.Has(pair.id) {
t.Errorf("%v, %v: expected to find entry for %v, but did not.", testCase, line, pair.id)
continue
}
found, ok := s.Get(pair.id)
if !ok {
t.Errorf("%v, %v: unexpected absent entry for %v", testCase, line, pair.id)
continue
}
if e, a := pair.obj, found; !reflect.DeepEqual(e, a) {
t.Errorf("%v, %v: expected %v, got %v for %v", testCase, line, e, a, pair.id)
}
}
if e, a := len(pairs), len(ids); e != a {
t.Errorf("%v, %v: expected len %v, got %v", testCase, line, e, a)
}
}
}
}
func TestPoller_Run(t *testing.T) {
s := NewStore()
const count = 10
var called = 0
done := make(chan struct{})
NewPoller(func() (Enumerator, error) {
called++
if called == count {
close(done)
}
// test both error and regular returns.
if called&1 == 0 {
return testEnumerator{}, nil
}
return nil, errors.New("transient error")
}, time.Millisecond, s).Run()
// The test here is that we get called at least count times.
<-done
// We never added anything, verify that.
if e, a := 0, len(s.Contains()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}

View File

@ -26,29 +26,17 @@ import (
"github.com/golang/glog"
)
// Store is a generic object storage interface. Reflector knows how to watch a server
// and update a store. A generic store is provided, which allows Reflector to be used
// as a local caching system, and an LRU store, which allows Reflector to work like a
// queue of items yet to be processed.
type Store interface {
Add(ID string, obj interface{})
Update(ID string, obj interface{})
Delete(ID string, obj interface{})
List() []interface{}
Get(ID string) (item interface{}, exists bool)
}
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// The type of object we expect to place in the store.
expectedType reflect.Type
// The destination to sync up with the watch source
store Store
// watchCreater is called to initiate watches.
// watchFactory is called to initiate watches.
watchFactory WatchFactory
// loopDelay controls timing between one watch ending and
// period controls timing between one watch ending and
// the beginning of the next one.
loopDelay time.Duration
period time.Duration
}
// WatchFactory should begin a watch at the specified version.
@ -62,7 +50,7 @@ func NewReflector(watchFactory WatchFactory, expectedType interface{}, store Sto
watchFactory: watchFactory,
store: store,
expectedType: reflect.TypeOf(expectedType),
loopDelay: time.Second,
period: time.Second,
}
return gc
}
@ -78,7 +66,7 @@ func (gc *Reflector) Run() {
return
}
gc.watchHandler(w, &resourceVersion)
}, gc.loopDelay)
}, gc.period)
}
// watchHandler watches w and keeps *resourceVersion up to date.
@ -104,13 +92,13 @@ func (gc *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) {
case watch.Modified:
gc.store.Update(jsonBase.ID(), event.Object)
case watch.Deleted:
gc.store.Delete(jsonBase.ID(), event.Object)
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
gc.store.Delete(jsonBase.ID())
default:
glog.Errorf("unable to understand watch event %#v", event)
}
next := jsonBase.ResourceVersion() + 1
if next > *resourceVersion {
*resourceVersion = next
}
*resourceVersion = jsonBase.ResourceVersion() + 1
}
}

View File

@ -30,10 +30,10 @@ func TestReflector_watchHandler(t *testing.T) {
s.Add("foo", &api.Pod{JSONBase: api.JSONBase{ID: "foo"}})
s.Add("bar", &api.Pod{JSONBase: api.JSONBase{ID: "bar"}})
go func() {
fw.Modify(&api.Pod{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 55}})
fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz", ResourceVersion: 32}})
fw.Add(&api.Service{JSONBase: api.JSONBase{ID: "rejected"}})
fw.Delete(&api.Pod{JSONBase: api.JSONBase{ID: "foo"}})
fw.Modify(&api.Pod{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 55}})
fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz", ResourceVersion: 32}})
fw.Stop()
}()
var resumeRV uint64
@ -62,8 +62,8 @@ func TestReflector_watchHandler(t *testing.T) {
}
}
// RV should stay 1 higher than the highest id we see.
if e, a := uint64(56), resumeRV; e != a {
// RV should stay 1 higher than the last id we see.
if e, a := uint64(33), resumeRV; e != a {
t.Errorf("expected %v, got %v", e, a)
}
}
@ -87,7 +87,7 @@ func TestReflector_Run(t *testing.T) {
}
s := NewFIFO()
r := NewReflector(watchStarter, &api.Pod{}, s)
r.loopDelay = 0
r.period = 0
r.Run()
ids := []string{"foo", "bar", "baz", "qux", "zoo"}

View File

@ -18,32 +18,47 @@ package cache
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// Store is a generic object storage interface. Reflector knows how to watch a server
// and update a store. A generic store is provided, which allows Reflector to be used
// as a local caching system, and an LRU store, which allows Reflector to work like a
// queue of items yet to be processed.
type Store interface {
Add(id string, obj interface{})
Update(id string, obj interface{})
Delete(id string)
List() []interface{}
Contains() util.StringSet
Get(id string) (item interface{}, exists bool)
}
type cache struct {
lock sync.RWMutex
items map[string]interface{}
}
// Add inserts an item into the cache.
func (c *cache) Add(ID string, obj interface{}) {
func (c *cache) Add(id string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
c.items[ID] = obj
c.items[id] = obj
}
// Update sets an item in the cache to its updated state.
func (c *cache) Update(ID string, obj interface{}) {
func (c *cache) Update(id string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
c.items[ID] = obj
c.items[id] = obj
}
// Delete removes an item from the cache.
func (c *cache) Delete(ID string, obj interface{}) {
func (c *cache) Delete(id string) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.items, ID)
delete(c.items, id)
}
// List returns a list of all the items.
@ -58,12 +73,25 @@ func (c *cache) List() []interface{} {
return list
}
// Get returns the requested item, or sets exists=false.
// Get is completely threadsafe as long as you treat all items as immutable.
func (c *cache) Get(ID string) (item interface{}, exists bool) {
// Contains returns a util.StringSet containing all IDs of stored the items.
// This is a snapshot of a moment in time, and one should keep in mind that
// other go routines can add or remove items after you call this.
func (c *cache) Contains() util.StringSet {
c.lock.RLock()
defer c.lock.RUnlock()
item, exists = c.items[ID]
set := util.StringSet{}
for id := range c.items {
set.Insert(id)
}
return set
}
// Get returns the requested item, or sets exists=false.
// Get is completely threadsafe as long as you treat all items as immutable.
func (c *cache) Get(id string) (item interface{}, exists bool) {
c.lock.RLock()
defer c.lock.RUnlock()
item, exists = c.items[id]
return item, exists
}

View File

@ -40,10 +40,12 @@ func doTestStore(t *testing.T, store Store) {
t.Errorf("expected %v, got %v", e, a)
}
}
store.Delete("foo", "qux")
store.Delete("foo")
if _, ok := store.Get("foo"); ok {
t.Errorf("found deleted item??")
}
// Test List
store.Add("a", "b")
store.Add("c", "d")
store.Add("e", "e")
@ -57,6 +59,15 @@ func doTestStore(t *testing.T, store Store) {
if len(found) != 3 {
t.Errorf("extra items")
}
// Check that ID list is correct.
ids := store.Contains()
if !ids.HasAll("a", "c", "e") {
t.Errorf("missing items")
}
if len(ids) != 3 {
t.Errorf("extra items")
}
}
func TestCache(t *testing.T) {