Move WatchCache to pkg/storage

This commit is contained in:
Wojciech Tyczynski 2015-08-18 10:40:23 +02:00
parent e202f9c797
commit d318b22f65
4 changed files with 87 additions and 79 deletions

View File

@ -359,34 +359,3 @@ func TestReflector_ListAndWatchWithErrors(t *testing.T) {
r.ListAndWatch(util.NeverStop) r.ListAndWatch(util.NeverStop)
} }
} }
func TestReflectorForWatchCache(t *testing.T) {
store := NewWatchCache(5)
{
_, version := store.ListWithVersion()
if version != 0 {
t.Errorf("unexpected resource version: %d", version)
}
}
lw := &testLW{
WatchFunc: func(rv string) (watch.Interface, error) {
fw := watch.NewFake()
go fw.Stop()
return fw, nil
},
ListFunc: func() (runtime.Object, error) {
return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "10"}}, nil
},
}
r := NewReflector(lw, &api.Pod{}, store, 0)
r.ListAndWatch(util.NeverStop)
{
_, version := store.ListWithVersion()
if version != 10 {
t.Errorf("unexpected resource version: %d", version)
}
}
}

View File

@ -86,7 +86,7 @@ type Cacher struct {
storage Interface storage Interface
// "sliding window" of recent changes of objects and the current state. // "sliding window" of recent changes of objects and the current state.
watchCache *cache.WatchCache watchCache *watchCache
reflector *cache.Reflector reflector *cache.Reflector
// Registered watchers. // Registered watchers.
@ -104,7 +104,7 @@ type Cacher struct {
// internal cache and updating its cache in the background based on the given // internal cache and updating its cache in the background based on the given
// configuration. // configuration.
func NewCacher(config CacherConfig) *Cacher { func NewCacher(config CacherConfig) *Cacher {
watchCache := cache.NewWatchCache(config.CacheCapacity) watchCache := newWatchCache(config.CacheCapacity)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
cacher := &Cacher{ cacher := &Cacher{
@ -272,7 +272,7 @@ func (c *Cacher) Codec() runtime.Codec {
return c.storage.Codec() return c.storage.Codec()
} }
func (c *Cacher) processEvent(event cache.WatchCacheEvent) { func (c *Cacher) processEvent(event watchCacheEvent) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
for _, watcher := range c.watchers { for _, watcher := range c.watchers {
@ -361,16 +361,16 @@ func (lw *cacherListerWatcher) Watch(resourceVersion string) (watch.Interface, e
// cacherWatch implements watch.Interface // cacherWatch implements watch.Interface
type cacheWatcher struct { type cacheWatcher struct {
sync.Mutex sync.Mutex
input chan cache.WatchCacheEvent input chan watchCacheEvent
result chan watch.Event result chan watch.Event
filter FilterFunc filter FilterFunc
stopped bool stopped bool
forget func() forget func()
} }
func newCacheWatcher(initEvents []cache.WatchCacheEvent, filter FilterFunc, forget func()) *cacheWatcher { func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func()) *cacheWatcher {
watcher := &cacheWatcher{ watcher := &cacheWatcher{
input: make(chan cache.WatchCacheEvent, 10), input: make(chan watchCacheEvent, 10),
result: make(chan watch.Event, 10), result: make(chan watch.Event, 10),
filter: filter, filter: filter,
stopped: false, stopped: false,
@ -400,11 +400,11 @@ func (c *cacheWatcher) stop() {
} }
} }
func (c *cacheWatcher) add(event cache.WatchCacheEvent) { func (c *cacheWatcher) add(event watchCacheEvent) {
c.input <- event c.input <- event
} }
func (c *cacheWatcher) sendWatchCacheEvent(event cache.WatchCacheEvent) { func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
curObjPasses := event.Type != watch.Deleted && c.filter(event.Object) curObjPasses := event.Type != watch.Deleted && c.filter(event.Object)
oldObjPasses := false oldObjPasses := false
if event.PrevObject != nil { if event.PrevObject != nil {
@ -430,7 +430,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event cache.WatchCacheEvent) {
} }
} }
func (c *cacheWatcher) process(initEvents []cache.WatchCacheEvent) { func (c *cacheWatcher) process(initEvents []watchCacheEvent) {
for _, event := range initEvents { for _, event := range initEvents {
c.sendWatchCacheEvent(event) c.sendWatchCacheEvent(event)
} }

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package cache package storage
import ( import (
"fmt" "fmt"
@ -23,20 +23,16 @@ import (
"sync" "sync"
"k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/client/unversioned/cache"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
// TODO(wojtek-t): All structure in this file should be private to // watchCacheEvent is a single "watch event" that is send to users of
// pkg/storage package. We should remove the reference to WatchCache // watchCache. Additionally to a typical "watch.Event" it contains
// from Reflector (by changing the Replace method signature in Store
// interface to take resource version too) and move it under pkg/storage.
// WatchCacheEvent is a single "watch event" that is send to users of
// WatchCache. Additionally to a typical "watch.Event" it contains
// the previous value of the object to enable proper filtering in the // the previous value of the object to enable proper filtering in the
// upper layers. // upper layers.
type WatchCacheEvent struct { type watchCacheEvent struct {
Type watch.EventType Type watch.EventType
Object runtime.Object Object runtime.Object
PrevObject runtime.Object PrevObject runtime.Object
@ -47,15 +43,15 @@ type WatchCacheEvent struct {
// itself. // itself.
type watchCacheElement struct { type watchCacheElement struct {
resourceVersion uint64 resourceVersion uint64
watchCacheEvent WatchCacheEvent watchCacheEvent watchCacheEvent
} }
// WatchCache implements a Store interface. // watchCache implements a Store interface.
// However, it depends on the elements implementing runtime.Object interface. // However, it depends on the elements implementing runtime.Object interface.
// //
// WatchCache is a "sliding window" (with a limitted capacity) of objects // watchCache is a "sliding window" (with a limitted capacity) of objects
// observed from a watch. // observed from a watch.
type WatchCache struct { type watchCache struct {
sync.RWMutex sync.RWMutex
// Maximum size of history window. // Maximum size of history window.
@ -73,9 +69,9 @@ type WatchCache struct {
// store will effectively support LIST operation from the "end of cache // store will effectively support LIST operation from the "end of cache
// history" i.e. from the moment just after the newest cached watched event. // history" i.e. from the moment just after the newest cached watched event.
// It is necessary to effectively allow clients to start watching at now. // It is necessary to effectively allow clients to start watching at now.
store Store store cache.Store
// ResourceVersion up to which the WatchCache is propagated. // ResourceVersion up to which the watchCache is propagated.
resourceVersion uint64 resourceVersion uint64
// This handler is run at the end of every successful Replace() method. // This handler is run at the end of every successful Replace() method.
@ -83,21 +79,21 @@ type WatchCache struct {
// This handler is run at the end of every Add/Update/Delete method // This handler is run at the end of every Add/Update/Delete method
// and additionally gets the previous value of the object. // and additionally gets the previous value of the object.
onEvent func(WatchCacheEvent) onEvent func(watchCacheEvent)
} }
func NewWatchCache(capacity int) *WatchCache { func newWatchCache(capacity int) *watchCache {
return &WatchCache{ return &watchCache{
capacity: capacity, capacity: capacity,
cache: make([]watchCacheElement, capacity), cache: make([]watchCacheElement, capacity),
startIndex: 0, startIndex: 0,
endIndex: 0, endIndex: 0,
store: NewStore(MetaNamespaceKeyFunc), store: cache.NewStore(cache.MetaNamespaceKeyFunc),
resourceVersion: 0, resourceVersion: 0,
} }
} }
func (w *WatchCache) Add(obj interface{}) error { func (w *watchCache) Add(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj) object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
if err != nil { if err != nil {
return err return err
@ -108,7 +104,7 @@ func (w *WatchCache) Add(obj interface{}) error {
return w.processEvent(event, resourceVersion, f) return w.processEvent(event, resourceVersion, f)
} }
func (w *WatchCache) Update(obj interface{}) error { func (w *watchCache) Update(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj) object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
if err != nil { if err != nil {
return err return err
@ -119,7 +115,7 @@ func (w *WatchCache) Update(obj interface{}) error {
return w.processEvent(event, resourceVersion, f) return w.processEvent(event, resourceVersion, f)
} }
func (w *WatchCache) Delete(obj interface{}) error { func (w *watchCache) Delete(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj) object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
if err != nil { if err != nil {
return err return err
@ -153,7 +149,7 @@ func parseResourceVersion(resourceVersion string) (uint64, error) {
return strconv.ParseUint(resourceVersion, 10, 64) return strconv.ParseUint(resourceVersion, 10, 64)
} }
func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error { func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error {
w.Lock() w.Lock()
defer w.Unlock() defer w.Unlock()
previous, exists, err := w.store.Get(event.Object) previous, exists, err := w.store.Get(event.Object)
@ -166,7 +162,7 @@ func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, upd
} else { } else {
prevObject = nil prevObject = nil
} }
watchCacheEvent := WatchCacheEvent{event.Type, event.Object, prevObject} watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject}
if w.onEvent != nil { if w.onEvent != nil {
w.onEvent(watchCacheEvent) w.onEvent(watchCacheEvent)
} }
@ -176,7 +172,7 @@ func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, upd
} }
// Assumes that lock is already held for write. // Assumes that lock is already held for write.
func (w *WatchCache) updateCache(resourceVersion uint64, event WatchCacheEvent) { func (w *watchCache) updateCache(resourceVersion uint64, event watchCacheEvent) {
if w.endIndex == w.startIndex+w.capacity { if w.endIndex == w.startIndex+w.capacity {
// Cache is full - remove the oldest element. // Cache is full - remove the oldest element.
w.startIndex++ w.startIndex++
@ -185,37 +181,37 @@ func (w *WatchCache) updateCache(resourceVersion uint64, event WatchCacheEvent)
w.endIndex++ w.endIndex++
} }
func (w *WatchCache) List() []interface{} { func (w *watchCache) List() []interface{} {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.store.List() return w.store.List()
} }
func (w *WatchCache) ListWithVersion() ([]interface{}, uint64) { func (w *watchCache) ListWithVersion() ([]interface{}, uint64) {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.store.List(), w.resourceVersion return w.store.List(), w.resourceVersion
} }
func (w *WatchCache) ListKeys() []string { func (w *watchCache) ListKeys() []string {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.store.ListKeys() return w.store.ListKeys()
} }
func (w *WatchCache) Get(obj interface{}) (interface{}, bool, error) { func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.store.Get(obj) return w.store.Get(obj)
} }
func (w *WatchCache) GetByKey(key string) (interface{}, bool, error) { func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.store.GetByKey(key) return w.store.GetByKey(key)
} }
func (w *WatchCache) Replace(objs []interface{}, resourceVersion string) error { func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
version, err := parseResourceVersion(resourceVersion) version, err := parseResourceVersion(resourceVersion)
if err != nil { if err != nil {
return err return err
@ -236,19 +232,19 @@ func (w *WatchCache) Replace(objs []interface{}, resourceVersion string) error {
return nil return nil
} }
func (w *WatchCache) SetOnReplace(onReplace func()) { func (w *watchCache) SetOnReplace(onReplace func()) {
w.Lock() w.Lock()
defer w.Unlock() defer w.Unlock()
w.onReplace = onReplace w.onReplace = onReplace
} }
func (w *WatchCache) SetOnEvent(onEvent func(WatchCacheEvent)) { func (w *watchCache) SetOnEvent(onEvent func(watchCacheEvent)) {
w.Lock() w.Lock()
defer w.Unlock() defer w.Unlock()
w.onEvent = onEvent w.onEvent = onEvent
} }
func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]WatchCacheEvent, error) { func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]watchCacheEvent, error) {
size := w.endIndex - w.startIndex size := w.endIndex - w.startIndex
oldest := w.resourceVersion oldest := w.resourceVersion
if size > 0 { if size > 0 {
@ -264,14 +260,14 @@ func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]Wa
return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion
} }
first := sort.Search(size, f) first := sort.Search(size, f)
result := make([]WatchCacheEvent, size-first) result := make([]watchCacheEvent, size-first)
for i := 0; i < size-first; i++ { for i := 0; i < size-first; i++ {
result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent
} }
return result, nil return result, nil
} }
func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]WatchCacheEvent, error) { func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]watchCacheEvent, error) {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.GetAllEventsSinceThreadUnsafe(resourceVersion) return w.GetAllEventsSinceThreadUnsafe(resourceVersion)

View File

@ -14,13 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package cache package storage
import ( import (
"strconv" "strconv"
"testing" "testing"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/unversioned/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -36,7 +38,7 @@ func makeTestPod(name string, resourceVersion uint64) *api.Pod {
} }
func TestWatchCacheBasic(t *testing.T) { func TestWatchCacheBasic(t *testing.T) {
store := NewWatchCache(2) store := newWatchCache(2)
// Test Add/Update/Delete. // Test Add/Update/Delete.
pod1 := makeTestPod("pod", 1) pod1 := makeTestPod("pod", 1)
@ -106,7 +108,7 @@ func TestWatchCacheBasic(t *testing.T) {
} }
func TestEvents(t *testing.T) { func TestEvents(t *testing.T) {
store := NewWatchCache(5) store := newWatchCache(5)
store.Add(makeTestPod("pod", 2)) store.Add(makeTestPod("pod", 2))
@ -221,3 +223,44 @@ func TestEvents(t *testing.T) {
} }
} }
} }
type testLW struct {
ListFunc func() (runtime.Object, error)
WatchFunc func(resourceVersion string) (watch.Interface, error)
}
func (t *testLW) List() (runtime.Object, error) { return t.ListFunc() }
func (t *testLW) Watch(resourceVersion string) (watch.Interface, error) {
return t.WatchFunc(resourceVersion)
}
func TestReflectorForWatchCache(t *testing.T) {
store := newWatchCache(5)
{
_, version := store.ListWithVersion()
if version != 0 {
t.Errorf("unexpected resource version: %d", version)
}
}
lw := &testLW{
WatchFunc: func(rv string) (watch.Interface, error) {
fw := watch.NewFake()
go fw.Stop()
return fw, nil
},
ListFunc: func() (runtime.Object, error) {
return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "10"}}, nil
},
}
r := cache.NewReflector(lw, &api.Pod{}, store, 0)
r.ListAndWatch(util.NeverStop)
{
_, version := store.ListWithVersion()
if version != 10 {
t.Errorf("unexpected resource version: %d", version)
}
}
}