Merge pull request #94364 from wojtek-t/efficient_watch_resumption

Efficient watch resumption

Kubernetes-commit: 0f39af90ed39794ceea426aa0f77de67b1392308
This commit is contained in:
Kubernetes Publisher 2020-09-25 15:42:48 -07:00
commit a0a9b7d9d5
2 changed files with 68 additions and 0 deletions

View File

@ -101,6 +101,15 @@ type Reflector struct {
watchErrorHandler WatchErrorHandler watchErrorHandler WatchErrorHandler
} }
// ResourceVersionUpdater is an interface that allows store implementation to
// track the current resource version of the reflector. This is especially
// important if storage bookmarks are enabled.
type ResourceVersionUpdater interface {
// UpdateResourceVersion is called each time current resource version of the reflector
// is updated.
UpdateResourceVersion(resourceVersion string)
}
// The WatchErrorHandler is called whenever ListAndWatch drops the // The WatchErrorHandler is called whenever ListAndWatch drops the
// connection with an error. After calling this handler, the informer // connection with an error. After calling this handler, the informer
// will backoff and retry. // will backoff and retry.
@ -507,6 +516,9 @@ loop:
} }
*resourceVersion = newResourceVersion *resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion) r.setLastSyncResourceVersion(newResourceVersion)
if rvu, ok := r.store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(newResourceVersion)
}
eventCount++ eventCount++
} }
} }

View File

@ -910,3 +910,59 @@ func TestReflectorSetExpectedType(t *testing.T) {
}) })
} }
} }
type storeWithRV struct {
Store
// resourceVersions tracks values passed by UpdateResourceVersion
resourceVersions []string
}
func (s *storeWithRV) UpdateResourceVersion(resourceVersion string) {
s.resourceVersions = append(s.resourceVersions, resourceVersion)
}
func newStoreWithRV() *storeWithRV {
return &storeWithRV{
Store: NewStore(MetaNamespaceKeyFunc),
}
}
func TestReflectorResourceVersionUpdate(t *testing.T) {
s := newStoreWithRV()
stopCh := make(chan struct{})
fw := watch.NewFake()
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
makePod := func(rv string) *v1.Pod {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: rv}}
}
go func() {
fw.Action(watch.Added, makePod("10"))
fw.Action(watch.Modified, makePod("20"))
fw.Action(watch.Bookmark, makePod("30"))
fw.Action(watch.Deleted, makePod("40"))
close(stopCh)
}()
// Initial list should use RV=0
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}
expectedRVs := []string{"10", "20", "30", "40"}
if !reflect.DeepEqual(s.resourceVersions, expectedRVs) {
t.Errorf("Expected series of resource version updates of %#v but got: %#v", expectedRVs, s.resourceVersions)
}
}