add watch bookmark

Kubernetes-commit: d70edd3d39d4430d71c4b7c9adba8df5ba7f16c8
This commit is contained in:
fansong.cfs 2019-03-19 18:16:23 +08:00 committed by Kubernetes Publisher
parent b7bf0a35f1
commit 94d52886d7
5 changed files with 14 additions and 3 deletions

View File

@ -54,7 +54,7 @@ func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) {
return "", nil, fmt.Errorf("unable to decode to metav1.Event")
}
switch got.Type {
case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error):
case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error), string(watch.Bookmark):
default:
return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
}

View File

@ -42,7 +42,7 @@ func getDecoder() runtime.Decoder {
}
func TestDecoder(t *testing.T) {
table := []watch.EventType{watch.Added, watch.Deleted, watch.Modified, watch.Error}
table := []watch.EventType{watch.Added, watch.Deleted, watch.Modified, watch.Error, watch.Bookmark}
for _, eventType := range table {
out, in := io.Pipe()

View File

@ -56,6 +56,10 @@ func TestEncodeDecodeRoundTrip(t *testing.T) {
watch.Deleted,
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
watch.Bookmark,
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
}
for i, testCase := range testCases {
buf := &bytes.Buffer{}

View File

@ -271,6 +271,11 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
// Disabled in Alpha release of watch bookmarks feature.
AllowWatchBookmarks: false,
}
w, err := r.listerWatcher.Watch(options)
@ -368,6 +373,8 @@ loop:
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}

View File

@ -153,7 +153,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
// We need to inspect the event and get ResourceVersion out of it
switch event.Type {
case watch.Added, watch.Modified, watch.Deleted:
case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark:
metaObject, ok := event.Object.(resourceVersionGetter)
if !ok {
_ = rw.send(watch.Event{