Implement watchCache structure.

This commit is contained in:
Wojciech Tyczynski
2015-07-28 08:26:53 +02:00
parent f53e0ff5a8
commit 52e3af4e93
4 changed files with 453 additions and 10 deletions

View File

@@ -135,13 +135,13 @@ outer:
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run starts a goroutine and returns immediately.
func (r *Reflector) Run() {
go util.Forever(func() { r.listAndWatch(util.NeverStop) }, r.period)
go util.Forever(func() { r.ListAndWatch(util.NeverStop) }, r.period)
}
// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
go util.Until(func() { r.listAndWatch(stopCh) }, r.period, stopCh)
go util.Until(func() { r.ListAndWatch(stopCh) }, r.period, stopCh)
}
var (
@@ -170,7 +170,7 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
return t.C, t.Stop
}
func (r *Reflector) listAndWatch(stopCh <-chan struct{}) {
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) {
var resourceVersion string
resyncCh, cleanup := r.resyncChan()
defer cleanup()
@@ -191,7 +191,7 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) {
util.HandleError(fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err))
return
}
if err := r.syncWith(items); err != nil {
if err := r.syncWith(items, resourceVersion); err != nil {
util.HandleError(fmt.Errorf("%s: Unable to sync list result: %v", r.name, err))
return
}
@@ -232,12 +232,16 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) {
}
// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object) error {
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
myStore, ok := r.store.(*WatchCache)
if ok {
return myStore.ReplaceWithVersion(found, resourceVersion)
}
return r.store.Replace(found)
}