Merge pull request #6074 from derekwaynecarr/reflector_last_sync_resource_version

Expose last sync resource version on reflector
This commit is contained in:
Vish Kannan 2015-04-08 14:56:33 -07:00
commit 094cbcc6fe
2 changed files with 17 additions and 0 deletions

View File

@ -51,6 +51,10 @@ type Reflector struct {
// the beginning of the next one. // the beginning of the next one.
period time.Duration period time.Duration
resyncPeriod time.Duration resyncPeriod time.Duration
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is not thread safe as it is not synchronized with access to the store
lastSyncResourceVersion string
} }
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
@ -130,6 +134,7 @@ func (r *Reflector) listAndWatch() {
glog.Errorf("Unable to sync list result: %v", err) glog.Errorf("Unable to sync list result: %v", err)
return return
} }
r.lastSyncResourceVersion = resourceVersion
for { for {
w, err := r.listerWatcher.Watch(resourceVersion) w, err := r.listerWatcher.Watch(resourceVersion)
@ -203,6 +208,7 @@ loop:
glog.Errorf("unable to understand watch event %#v", event) glog.Errorf("unable to understand watch event %#v", event)
} }
*resourceVersion = meta.ResourceVersion() *resourceVersion = meta.ResourceVersion()
r.lastSyncResourceVersion = *resourceVersion
eventCount++ eventCount++
} }
} }
@ -215,3 +221,9 @@ loop:
glog.V(4).Infof("Watch close - %v total %v items received", r.expectedType, eventCount) glog.V(4).Infof("Watch close - %v total %v items received", r.expectedType, eventCount)
return nil return nil
} }
// LastSyncResourceVersion is the resource version observed when last sync with the underlying store
// The value returned is not synchronized with access to the underlying store and is not thread-safe
func (r *Reflector) LastSyncResourceVersion() string {
return r.lastSyncResourceVersion
}

View File

@ -112,6 +112,11 @@ func TestReflector_watchHandler(t *testing.T) {
if e, a := "32", resumeRV; e != a { if e, a := "32", resumeRV; e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
// last sync resource version should be the last version synced with store
if e, a := "32", g.LastSyncResourceVersion(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
} }
func TestReflector_watchHandlerTimeout(t *testing.T) { func TestReflector_watchHandlerTimeout(t *testing.T) {