Merge pull request #12848 from wojtek-t/private_watch_cache

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-09-03 13:06:18 -07:00
commit 5d8a6049be
22 changed files with 113 additions and 332 deletions

View File

@ -277,7 +277,7 @@ func (f *HistoricalFIFO) pop(cancel chan struct{}) interface{} {
} }
} }
func (f *HistoricalFIFO) Replace(objs []interface{}) error { func (f *HistoricalFIFO) Replace(objs []interface{}, resourceVersion string) error {
notifications := make([]Entry, 0, len(objs)) notifications := make([]Entry, 0, len(objs))
defer func() { defer func() {
for _, e := range notifications { for _, e := range notifications {

View File

@ -122,7 +122,7 @@ func TestFIFO_addUpdate(t *testing.T) {
func TestFIFO_addReplace(t *testing.T) { func TestFIFO_addReplace(t *testing.T) {
f := NewHistorical(nil) f := NewHistorical(nil)
f.Add(&testObj{"foo", 10}) f.Add(&testObj{"foo", 10})
f.Replace([]interface{}{&testObj{"foo", 15}}) f.Replace([]interface{}{&testObj{"foo", 15}}, "0")
got := make(chan *testObj, 2) got := make(chan *testObj, 2)
go func() { go func() {
for { for {

View File

@ -893,11 +893,11 @@ func (psa *podStoreAdapter) Get(obj interface{}) (interface{}, bool, error) {
// Replace will delete the contents of the store, using instead the // Replace will delete the contents of the store, using instead the
// given map. This store implementation does NOT take ownership of the map. // given map. This store implementation does NOT take ownership of the map.
func (psa *podStoreAdapter) Replace(objs []interface{}) error { func (psa *podStoreAdapter) Replace(objs []interface{}, resourceVersion string) error {
newobjs := make([]interface{}, len(objs)) newobjs := make([]interface{}, len(objs))
for i, v := range objs { for i, v := range objs {
pod := v.(*api.Pod) pod := v.(*api.Pod)
newobjs[i] = &Pod{Pod: pod} newobjs[i] = &Pod{Pod: pod}
} }
return psa.FIFO.Replace(newobjs) return psa.FIFO.Replace(newobjs, resourceVersion)
} }

View File

@ -307,7 +307,7 @@ func (f *DeltaFIFO) Pop() interface{} {
// 'f' takes ownership of the map, you should not reference the map again // 'f' takes ownership of the map, you should not reference the map again
// after calling this function. f's queue is reset, too; upon return, it // after calling this function. f's queue is reset, too; upon return, it
// will contain the items in the map, in no particular order. // will contain the items in the map, in no particular order.
func (f *DeltaFIFO) Replace(list []interface{}) error { func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
for _, item := range list { for _, item := range list {

View File

@ -97,7 +97,7 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) {
) )
f.Add(mkFifoObj("foo", 10)) f.Add(mkFifoObj("foo", 10))
f.Update(mkFifoObj("foo", 12)) f.Update(mkFifoObj("foo", 12))
f.Replace([]interface{}{mkFifoObj("foo", 20)}) f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0")
f.Delete(mkFifoObj("foo", 15)) f.Delete(mkFifoObj("foo", 15))
f.Delete(mkFifoObj("foo", 18)) // flush the last one out f.Delete(mkFifoObj("foo", 18)) // flush the last one out
expect := []DeltaType{Added, Updated, Sync, Deleted} expect := []DeltaType{Added, Updated, Sync, Deleted}
@ -165,7 +165,7 @@ func TestDeltaFIFO_enqueueing(t *testing.T) {
func TestDeltaFIFO_addReplace(t *testing.T) { func TestDeltaFIFO_addReplace(t *testing.T) {
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
f.Add(mkFifoObj("foo", 10)) f.Add(mkFifoObj("foo", 10))
f.Replace([]interface{}{mkFifoObj("foo", 15)}) f.Replace([]interface{}{mkFifoObj("foo", 15)}, "0")
got := make(chan testFifoObject, 2) got := make(chan testFifoObject, 2)
go func() { go func() {
for { for {
@ -197,7 +197,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
}), }),
) )
f.Delete(mkFifoObj("baz", 10)) f.Delete(mkFifoObj("baz", 10))
f.Replace([]interface{}{mkFifoObj("foo", 5)}) f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
expectedList := []Deltas{ expectedList := []Deltas{
{{Deleted, mkFifoObj("baz", 10)}}, {{Deleted, mkFifoObj("baz", 10)}},

View File

@ -166,7 +166,7 @@ func (c *ExpirationCache) Delete(obj interface{}) error {
// Replace will convert all items in the given list to TimestampedEntries // Replace will convert all items in the given list to TimestampedEntries
// before attempting the replace operation. The replace operation will // before attempting the replace operation. The replace operation will
// delete the contents of the ExpirationCache `c`. // delete the contents of the ExpirationCache `c`.
func (c *ExpirationCache) Replace(list []interface{}) error { func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error {
items := map[string]interface{}{} items := map[string]interface{}{}
ts := c.clock.Now() ts := c.clock.Now()
for _, item := range list { for _, item := range list {
@ -176,7 +176,7 @@ func (c *ExpirationCache) Replace(list []interface{}) error {
} }
items[key] = &timestampedEntry{item, ts} items[key] = &timestampedEntry{item, ts}
} }
c.cacheStorage.Replace(items) c.cacheStorage.Replace(items, resourceVersion)
return nil return nil
} }

View File

@ -188,7 +188,7 @@ func (f *FIFO) Pop() interface{} {
// 'f' takes ownership of the map, you should not reference the map again // 'f' takes ownership of the map, you should not reference the map again
// after calling this function. f's queue is reset, too; upon return, it // after calling this function. f's queue is reset, too; upon return, it
// will contain the items in the map, in no particular order. // will contain the items in the map, in no particular order.
func (f *FIFO) Replace(list []interface{}) error { func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
items := map[string]interface{}{} items := map[string]interface{}{}
for _, item := range list { for _, item := range list {
key, err := f.keyFunc(item) key, err := f.keyFunc(item)

View File

@ -107,7 +107,7 @@ func TestFIFO_addUpdate(t *testing.T) {
func TestFIFO_addReplace(t *testing.T) { func TestFIFO_addReplace(t *testing.T) {
f := NewFIFO(testFifoObjectKeyFunc) f := NewFIFO(testFifoObjectKeyFunc)
f.Add(mkFifoObj("foo", 10)) f.Add(mkFifoObj("foo", 10))
f.Replace([]interface{}{mkFifoObj("foo", 15)}) f.Replace([]interface{}{mkFifoObj("foo", 15)}, "15")
got := make(chan testFifoObject, 2) got := make(chan testFifoObject, 2)
go func() { go func() {
for { for {

View File

@ -1,86 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util"
)
// Enumerator should be able to return the list of objects to be synced with
// one object at a time.
type Enumerator interface {
Len() int
Get(index int) (object interface{})
}
// GetFunc should return an enumerator that you wish the Poller to process.
type GetFunc func() (Enumerator, error)
// Poller is like Reflector, but it periodically polls instead of watching.
// This is intended to be a workaround for api objects that don't yet support
// watching.
type Poller struct {
getFunc GetFunc
period time.Duration
store Store
}
// NewPoller constructs a new poller. Note that polling probably doesn't make much
// sense to use along with the FIFO queue. The returned Poller will call getFunc and
// sync the objects in 'store' with the returned Enumerator, waiting 'period' between
// each call. It probably only makes sense to use a poller if you're treating the
// store as read-only.
func NewPoller(getFunc GetFunc, period time.Duration, store Store) *Poller {
return &Poller{
getFunc: getFunc,
period: period,
store: store,
}
}
// Run begins polling. It starts a goroutine and returns immediately.
func (p *Poller) Run() {
go util.Until(p.run, p.period, util.NeverStop)
}
// RunUntil begins polling. It starts a goroutine and returns immediately.
// It will stop when the stopCh is closed.
func (p *Poller) RunUntil(stopCh <-chan struct{}) {
go util.Until(p.run, p.period, stopCh)
}
func (p *Poller) run() {
e, err := p.getFunc()
if err != nil {
glog.Errorf("failed to list: %v", err)
return
}
p.sync(e)
}
func (p *Poller) sync(e Enumerator) {
items := []interface{}{}
for i := 0; i < e.Len(); i++ {
object := e.Get(i)
items = append(items, object)
}
p.store.Replace(items)
}

View File

@ -1,132 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"errors"
"reflect"
"testing"
"time"
)
func testPairKeyFunc(obj interface{}) (string, error) {
return obj.(testPair).id, nil
}
type testPair struct {
id string
obj interface{}
}
type testEnumerator []testPair
func (t testEnumerator) Len() int { return len(t) }
func (t testEnumerator) Get(i int) interface{} {
return t[i]
}
func TestPoller_sync(t *testing.T) {
table := []struct {
// each step simulates the list that a getFunc would receive.
steps [][]testPair
}{
{
steps: [][]testPair{
{
{"foo", "foo1"},
{"bar", "bar1"},
{"baz", "baz1"},
{"qux", "qux1"},
}, {
{"foo", "foo2"},
{"bar", "bar2"},
{"qux", "qux2"},
}, {
{"bar", "bar3"},
{"baz", "baz2"},
{"qux", "qux3"},
}, {
{"qux", "qux4"},
}, {
{"foo", "foo3"},
},
},
},
}
for testCase, item := range table {
s := NewStore(testPairKeyFunc)
// This is a unit test for the sync function, hence the nil getFunc.
p := NewPoller(nil, 0, s)
for line, pairs := range item.steps {
p.sync(testEnumerator(pairs))
list := s.List()
for _, pair := range pairs {
foundInList := false
for _, listItem := range list {
id, _ := testPairKeyFunc(listItem)
if pair.id == id {
foundInList = true
}
}
if !foundInList {
t.Errorf("%v, %v: expected to find list entry for %v, but did not.", testCase, line, pair.id)
continue
}
found, ok, _ := s.Get(pair)
if !ok {
t.Errorf("%v, %v: unexpected absent entry for %v", testCase, line, pair.id)
continue
}
if e, a := pair.obj, found.(testPair).obj; !reflect.DeepEqual(e, a) {
t.Errorf("%v, %v: expected %v, got %v for %v", testCase, line, e, a, pair.id)
}
}
if e, a := len(pairs), len(list); e != a {
t.Errorf("%v, %v: expected len %v, got %v", testCase, line, e, a)
}
}
}
}
func TestPoller_Run(t *testing.T) {
stopCh := make(chan struct{})
defer func() { stopCh <- struct{}{} }()
s := NewStore(testPairKeyFunc)
const count = 10
var called = 0
done := make(chan struct{})
NewPoller(func() (Enumerator, error) {
called++
if called == count {
close(done)
}
// test both error and regular returns.
if called&1 == 0 {
return testEnumerator{}, nil
}
return nil, errors.New("transient error")
}, time.Millisecond, s).RunUntil(stopCh)
// The test here is that we get called at least count times.
<-done
// We never added anything, verify that.
if e, a := 0, len(s.List()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}

View File

@ -234,12 +234,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
for _, item := range items { for _, item := range items {
found = append(found, item) found = append(found, item)
} }
return r.store.Replace(found, resourceVersion)
myStore, ok := r.store.(*WatchCache)
if ok {
return myStore.ReplaceWithVersion(found, resourceVersion)
}
return r.store.Replace(found)
} }
// watchHandler watches w and keeps *resourceVersion up to date. // watchHandler watches w and keeps *resourceVersion up to date.

View File

@ -359,34 +359,3 @@ func TestReflector_ListAndWatchWithErrors(t *testing.T) {
r.ListAndWatch(util.NeverStop) r.ListAndWatch(util.NeverStop)
} }
} }
func TestReflectorForWatchCache(t *testing.T) {
store := NewWatchCache(5)
{
_, version := store.ListWithVersion()
if version != 0 {
t.Errorf("unexpected resource version: %d", version)
}
}
lw := &testLW{
WatchFunc: func(rv string) (watch.Interface, error) {
fw := watch.NewFake()
go fw.Stop()
return fw, nil
},
ListFunc: func() (runtime.Object, error) {
return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "10"}}, nil
},
}
r := NewReflector(lw, &api.Pod{}, store, 0)
r.ListAndWatch(util.NeverStop)
{
_, version := store.ListWithVersion()
if version != 10 {
t.Errorf("unexpected resource version: %d", version)
}
}
}

View File

@ -43,7 +43,7 @@ type Store interface {
// Replace will delete the contents of the store, using instead the // Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference // given list. Store takes ownership of the list, you should not reference
// it after calling this function. // it after calling this function.
Replace([]interface{}) error Replace([]interface{}, string) error
} }
// KeyFunc knows how to make a key from an object. Implementations should be deterministic. // KeyFunc knows how to make a key from an object. Implementations should be deterministic.
@ -193,7 +193,7 @@ func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of 'c', using instead the given list. // Replace will delete the contents of 'c', using instead the given list.
// 'c' takes ownership of the list, you should not reference the list again // 'c' takes ownership of the list, you should not reference the list again
// after calling this function. // after calling this function.
func (c *cache) Replace(list []interface{}) error { func (c *cache) Replace(list []interface{}, resourceVersion string) error {
items := map[string]interface{}{} items := map[string]interface{}{}
for _, item := range list { for _, item := range list {
key, err := c.keyFunc(item) key, err := c.keyFunc(item)
@ -202,7 +202,7 @@ func (c *cache) Replace(list []interface{}) error {
} }
items[key] = item items[key] = item
} }
c.cacheStorage.Replace(items) c.cacheStorage.Replace(items, resourceVersion)
return nil return nil
} }

View File

@ -70,7 +70,7 @@ func doTestStore(t *testing.T, store Store) {
store.Replace([]interface{}{ store.Replace([]interface{}{
mkObj("foo", "foo"), mkObj("foo", "foo"),
mkObj("bar", "bar"), mkObj("bar", "bar"),
}) }, "0")
{ {
found := util.StringSet{} found := util.StringSet{}

View File

@ -41,7 +41,7 @@ type ThreadSafeStore interface {
Get(key string) (item interface{}, exists bool) Get(key string) (item interface{}, exists bool)
List() []interface{} List() []interface{}
ListKeys() []string ListKeys() []string
Replace(map[string]interface{}) Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error) Index(indexName string, obj interface{}) ([]interface{}, error)
ListIndexFuncValues(name string) []string ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error) ByIndex(indexName, indexKey string) ([]interface{}, error)
@ -112,7 +112,7 @@ func (c *threadSafeMap) ListKeys() []string {
return list return list
} }
func (c *threadSafeMap) Replace(items map[string]interface{}) { func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
c.items = items c.items = items

View File

@ -66,8 +66,8 @@ func (u *UndeltaStore) Delete(obj interface{}) error {
return nil return nil
} }
func (u *UndeltaStore) Replace(list []interface{}) error { func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error {
if err := u.Store.Replace(list); err != nil { if err := u.Store.Replace(list, resourceVersion); err != nil {
return err return err
} }
u.PushFunc(u.Store.List()) u.PushFunc(u.Store.List())

View File

@ -120,7 +120,7 @@ func TestReplaceCallsPush(t *testing.T) {
m := []interface{}{mkObj("a", 1)} m := []interface{}{mkObj("a", 1)}
u.Replace(m) u.Replace(m, "0")
if callcount != 1 { if callcount != 1 {
t.Errorf("Expected 1 calls, got %d", callcount) t.Errorf("Expected 1 calls, got %d", callcount)
} }

View File

@ -910,7 +910,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
// This should have no effect, since we've deleted the rc. // This should have no effect, since we've deleted the rc.
podExp.Seen(1, 0) podExp.Seen(1, 0)
manager.podStore.Store.Replace(make([]interface{}, 0)) manager.podStore.Store.Replace(make([]interface{}, 0), "0")
manager.syncReplicationController(getKey(rc, t)) manager.syncReplicationController(getKey(rc, t))
validateSyncReplication(t, &fakePodControl, 0, 0) validateSyncReplication(t, &fakePodControl, 0, 0)
} }

View File

@ -86,7 +86,7 @@ type Cacher struct {
storage Interface storage Interface
// "sliding window" of recent changes of objects and the current state. // "sliding window" of recent changes of objects and the current state.
watchCache *cache.WatchCache watchCache *watchCache
reflector *cache.Reflector reflector *cache.Reflector
// Registered watchers. // Registered watchers.
@ -104,7 +104,7 @@ type Cacher struct {
// internal cache and updating its cache in the background based on the given // internal cache and updating its cache in the background based on the given
// configuration. // configuration.
func NewCacher(config CacherConfig) *Cacher { func NewCacher(config CacherConfig) *Cacher {
watchCache := cache.NewWatchCache(config.CacheCapacity) watchCache := newWatchCache(config.CacheCapacity)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
cacher := &Cacher{ cacher := &Cacher{
@ -272,7 +272,7 @@ func (c *Cacher) Codec() runtime.Codec {
return c.storage.Codec() return c.storage.Codec()
} }
func (c *Cacher) processEvent(event cache.WatchCacheEvent) { func (c *Cacher) processEvent(event watchCacheEvent) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
for _, watcher := range c.watchers { for _, watcher := range c.watchers {
@ -361,16 +361,16 @@ func (lw *cacherListerWatcher) Watch(resourceVersion string) (watch.Interface, e
// cacherWatch implements watch.Interface // cacherWatch implements watch.Interface
type cacheWatcher struct { type cacheWatcher struct {
sync.Mutex sync.Mutex
input chan cache.WatchCacheEvent input chan watchCacheEvent
result chan watch.Event result chan watch.Event
filter FilterFunc filter FilterFunc
stopped bool stopped bool
forget func() forget func()
} }
func newCacheWatcher(initEvents []cache.WatchCacheEvent, filter FilterFunc, forget func()) *cacheWatcher { func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func()) *cacheWatcher {
watcher := &cacheWatcher{ watcher := &cacheWatcher{
input: make(chan cache.WatchCacheEvent, 10), input: make(chan watchCacheEvent, 10),
result: make(chan watch.Event, 10), result: make(chan watch.Event, 10),
filter: filter, filter: filter,
stopped: false, stopped: false,
@ -400,11 +400,11 @@ func (c *cacheWatcher) stop() {
} }
} }
func (c *cacheWatcher) add(event cache.WatchCacheEvent) { func (c *cacheWatcher) add(event watchCacheEvent) {
c.input <- event c.input <- event
} }
func (c *cacheWatcher) sendWatchCacheEvent(event cache.WatchCacheEvent) { func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
curObjPasses := event.Type != watch.Deleted && c.filter(event.Object) curObjPasses := event.Type != watch.Deleted && c.filter(event.Object)
oldObjPasses := false oldObjPasses := false
if event.PrevObject != nil { if event.PrevObject != nil {
@ -430,7 +430,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event cache.WatchCacheEvent) {
} }
} }
func (c *cacheWatcher) process(initEvents []cache.WatchCacheEvent) { func (c *cacheWatcher) process(initEvents []watchCacheEvent) {
for _, event := range initEvents { for _, event := range initEvents {
c.sendWatchCacheEvent(event) c.sendWatchCacheEvent(event)
} }

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package cache package storage
import ( import (
"fmt" "fmt"
@ -23,20 +23,16 @@ import (
"sync" "sync"
"k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/client/unversioned/cache"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
// TODO(wojtek-t): All structure in this file should be private to // watchCacheEvent is a single "watch event" that is send to users of
// pkg/storage package. We should remove the reference to WatchCache // watchCache. Additionally to a typical "watch.Event" it contains
// from Reflector (by changing the Replace method signature in Store
// interface to take resource version too) and move it under pkg/storage.
// WatchCacheEvent is a single "watch event" that is send to users of
// WatchCache. Additionally to a typical "watch.Event" it contains
// the previous value of the object to enable proper filtering in the // the previous value of the object to enable proper filtering in the
// upper layers. // upper layers.
type WatchCacheEvent struct { type watchCacheEvent struct {
Type watch.EventType Type watch.EventType
Object runtime.Object Object runtime.Object
PrevObject runtime.Object PrevObject runtime.Object
@ -47,15 +43,15 @@ type WatchCacheEvent struct {
// itself. // itself.
type watchCacheElement struct { type watchCacheElement struct {
resourceVersion uint64 resourceVersion uint64
watchCacheEvent WatchCacheEvent watchCacheEvent watchCacheEvent
} }
// WatchCache implements a Store interface. // watchCache implements a Store interface.
// However, it depends on the elements implementing runtime.Object interface. // However, it depends on the elements implementing runtime.Object interface.
// //
// WatchCache is a "sliding window" (with a limitted capacity) of objects // watchCache is a "sliding window" (with a limitted capacity) of objects
// observed from a watch. // observed from a watch.
type WatchCache struct { type watchCache struct {
sync.RWMutex sync.RWMutex
// Maximum size of history window. // Maximum size of history window.
@ -73,9 +69,9 @@ type WatchCache struct {
// store will effectively support LIST operation from the "end of cache // store will effectively support LIST operation from the "end of cache
// history" i.e. from the moment just after the newest cached watched event. // history" i.e. from the moment just after the newest cached watched event.
// It is necessary to effectively allow clients to start watching at now. // It is necessary to effectively allow clients to start watching at now.
store Store store cache.Store
// ResourceVersion up to which the WatchCache is propagated. // ResourceVersion up to which the watchCache is propagated.
resourceVersion uint64 resourceVersion uint64
// This handler is run at the end of every successful Replace() method. // This handler is run at the end of every successful Replace() method.
@ -83,21 +79,21 @@ type WatchCache struct {
// This handler is run at the end of every Add/Update/Delete method // This handler is run at the end of every Add/Update/Delete method
// and additionally gets the previous value of the object. // and additionally gets the previous value of the object.
onEvent func(WatchCacheEvent) onEvent func(watchCacheEvent)
} }
func NewWatchCache(capacity int) *WatchCache { func newWatchCache(capacity int) *watchCache {
return &WatchCache{ return &watchCache{
capacity: capacity, capacity: capacity,
cache: make([]watchCacheElement, capacity), cache: make([]watchCacheElement, capacity),
startIndex: 0, startIndex: 0,
endIndex: 0, endIndex: 0,
store: NewStore(MetaNamespaceKeyFunc), store: cache.NewStore(cache.MetaNamespaceKeyFunc),
resourceVersion: 0, resourceVersion: 0,
} }
} }
func (w *WatchCache) Add(obj interface{}) error { func (w *watchCache) Add(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj) object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
if err != nil { if err != nil {
return err return err
@ -108,7 +104,7 @@ func (w *WatchCache) Add(obj interface{}) error {
return w.processEvent(event, resourceVersion, f) return w.processEvent(event, resourceVersion, f)
} }
func (w *WatchCache) Update(obj interface{}) error { func (w *watchCache) Update(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj) object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
if err != nil { if err != nil {
return err return err
@ -119,7 +115,7 @@ func (w *WatchCache) Update(obj interface{}) error {
return w.processEvent(event, resourceVersion, f) return w.processEvent(event, resourceVersion, f)
} }
func (w *WatchCache) Delete(obj interface{}) error { func (w *watchCache) Delete(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj) object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
if err != nil { if err != nil {
return err return err
@ -153,7 +149,7 @@ func parseResourceVersion(resourceVersion string) (uint64, error) {
return strconv.ParseUint(resourceVersion, 10, 64) return strconv.ParseUint(resourceVersion, 10, 64)
} }
func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error { func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error {
w.Lock() w.Lock()
defer w.Unlock() defer w.Unlock()
previous, exists, err := w.store.Get(event.Object) previous, exists, err := w.store.Get(event.Object)
@ -166,7 +162,7 @@ func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, upd
} else { } else {
prevObject = nil prevObject = nil
} }
watchCacheEvent := WatchCacheEvent{event.Type, event.Object, prevObject} watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject}
if w.onEvent != nil { if w.onEvent != nil {
w.onEvent(watchCacheEvent) w.onEvent(watchCacheEvent)
} }
@ -176,7 +172,7 @@ func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, upd
} }
// Assumes that lock is already held for write. // Assumes that lock is already held for write.
func (w *WatchCache) updateCache(resourceVersion uint64, event WatchCacheEvent) { func (w *watchCache) updateCache(resourceVersion uint64, event watchCacheEvent) {
if w.endIndex == w.startIndex+w.capacity { if w.endIndex == w.startIndex+w.capacity {
// Cache is full - remove the oldest element. // Cache is full - remove the oldest element.
w.startIndex++ w.startIndex++
@ -185,41 +181,37 @@ func (w *WatchCache) updateCache(resourceVersion uint64, event WatchCacheEvent)
w.endIndex++ w.endIndex++
} }
func (w *WatchCache) List() []interface{} { func (w *watchCache) List() []interface{} {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.store.List() return w.store.List()
} }
func (w *WatchCache) ListWithVersion() ([]interface{}, uint64) { func (w *watchCache) ListWithVersion() ([]interface{}, uint64) {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.store.List(), w.resourceVersion return w.store.List(), w.resourceVersion
} }
func (w *WatchCache) ListKeys() []string { func (w *watchCache) ListKeys() []string {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.store.ListKeys() return w.store.ListKeys()
} }
func (w *WatchCache) Get(obj interface{}) (interface{}, bool, error) { func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.store.Get(obj) return w.store.Get(obj)
} }
func (w *WatchCache) GetByKey(key string) (interface{}, bool, error) { func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.store.GetByKey(key) return w.store.GetByKey(key)
} }
func (w *WatchCache) Replace(objs []interface{}) error { func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
return w.ReplaceWithVersion(objs, "0")
}
func (w *WatchCache) ReplaceWithVersion(objs []interface{}, resourceVersion string) error {
version, err := parseResourceVersion(resourceVersion) version, err := parseResourceVersion(resourceVersion)
if err != nil { if err != nil {
return err return err
@ -230,7 +222,7 @@ func (w *WatchCache) ReplaceWithVersion(objs []interface{}, resourceVersion stri
w.startIndex = 0 w.startIndex = 0
w.endIndex = 0 w.endIndex = 0
if err := w.store.Replace(objs); err != nil { if err := w.store.Replace(objs, resourceVersion); err != nil {
return err return err
} }
w.resourceVersion = version w.resourceVersion = version
@ -240,19 +232,19 @@ func (w *WatchCache) ReplaceWithVersion(objs []interface{}, resourceVersion stri
return nil return nil
} }
func (w *WatchCache) SetOnReplace(onReplace func()) { func (w *watchCache) SetOnReplace(onReplace func()) {
w.Lock() w.Lock()
defer w.Unlock() defer w.Unlock()
w.onReplace = onReplace w.onReplace = onReplace
} }
func (w *WatchCache) SetOnEvent(onEvent func(WatchCacheEvent)) { func (w *watchCache) SetOnEvent(onEvent func(watchCacheEvent)) {
w.Lock() w.Lock()
defer w.Unlock() defer w.Unlock()
w.onEvent = onEvent w.onEvent = onEvent
} }
func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]WatchCacheEvent, error) { func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]watchCacheEvent, error) {
size := w.endIndex - w.startIndex size := w.endIndex - w.startIndex
oldest := w.resourceVersion oldest := w.resourceVersion
if size > 0 { if size > 0 {
@ -268,14 +260,14 @@ func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]Wa
return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion
} }
first := sort.Search(size, f) first := sort.Search(size, f)
result := make([]WatchCacheEvent, size-first) result := make([]watchCacheEvent, size-first)
for i := 0; i < size-first; i++ { for i := 0; i < size-first; i++ {
result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent
} }
return result, nil return result, nil
} }
func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]WatchCacheEvent, error) { func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]watchCacheEvent, error) {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.GetAllEventsSinceThreadUnsafe(resourceVersion) return w.GetAllEventsSinceThreadUnsafe(resourceVersion)

View File

@ -14,13 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package cache package storage
import ( import (
"strconv" "strconv"
"testing" "testing"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/unversioned/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -36,7 +38,7 @@ func makeTestPod(name string, resourceVersion uint64) *api.Pod {
} }
func TestWatchCacheBasic(t *testing.T) { func TestWatchCacheBasic(t *testing.T) {
store := NewWatchCache(2) store := newWatchCache(2)
// Test Add/Update/Delete. // Test Add/Update/Delete.
pod1 := makeTestPod("pod", 1) pod1 := makeTestPod("pod", 1)
@ -90,7 +92,7 @@ func TestWatchCacheBasic(t *testing.T) {
store.Replace([]interface{}{ store.Replace([]interface{}{
makeTestPod("pod4", 7), makeTestPod("pod4", 7),
makeTestPod("pod5", 8), makeTestPod("pod5", 8),
}) }, "8")
{ {
podNames := util.StringSet{} podNames := util.StringSet{}
for _, item := range store.List() { for _, item := range store.List() {
@ -106,7 +108,7 @@ func TestWatchCacheBasic(t *testing.T) {
} }
func TestEvents(t *testing.T) { func TestEvents(t *testing.T) {
store := NewWatchCache(5) store := newWatchCache(5)
store.Add(makeTestPod("pod", 2)) store.Add(makeTestPod("pod", 2))
@ -221,3 +223,44 @@ func TestEvents(t *testing.T) {
} }
} }
} }
type testLW struct {
ListFunc func() (runtime.Object, error)
WatchFunc func(resourceVersion string) (watch.Interface, error)
}
func (t *testLW) List() (runtime.Object, error) { return t.ListFunc() }
func (t *testLW) Watch(resourceVersion string) (watch.Interface, error) {
return t.WatchFunc(resourceVersion)
}
func TestReflectorForWatchCache(t *testing.T) {
store := newWatchCache(5)
{
_, version := store.ListWithVersion()
if version != 0 {
t.Errorf("unexpected resource version: %d", version)
}
}
lw := &testLW{
WatchFunc: func(rv string) (watch.Interface, error) {
fw := watch.NewFake()
go fw.Stop()
return fw, nil
},
ListFunc: func() (runtime.Object, error) {
return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "10"}}, nil
},
}
r := cache.NewReflector(lw, &api.Pod{}, store, 0)
r.ListAndWatch(util.NeverStop)
{
_, version := store.ListWithVersion()
if version != 10 {
t.Errorf("unexpected resource version: %d", version)
}
}
}

View File

@ -131,7 +131,7 @@ func replacePods(pods []*api.Pod, store cache.Store) {
for i := range pods { for i := range pods {
found = append(found, pods[i]) found = append(found, pods[i])
} }
expectNoError(store.Replace(found)) expectNoError(store.Replace(found, "0"))
} }
// getContainerRestarts returns the count of container restarts across all pods matching the given labelSelector, // getContainerRestarts returns the count of container restarts across all pods matching the given labelSelector,