From 94d52886d7a386bb631c2477b40036eeb09c4d0d Mon Sep 17 00:00:00 2001 From: "fansong.cfs" Date: Tue, 19 Mar 2019 18:16:23 +0800 Subject: [PATCH] add watch bookmark Kubernetes-commit: d70edd3d39d4430d71c4b7c9adba8df5ba7f16c8 --- rest/watch/decoder.go | 2 +- rest/watch/decoder_test.go | 2 +- rest/watch/encoder_test.go | 4 ++++ tools/cache/reflector.go | 7 +++++++ tools/watch/retrywatcher.go | 2 +- 5 files changed, 14 insertions(+), 3 deletions(-) diff --git a/rest/watch/decoder.go b/rest/watch/decoder.go index 73bb63ad..e95c020b 100644 --- a/rest/watch/decoder.go +++ b/rest/watch/decoder.go @@ -54,7 +54,7 @@ func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) { return "", nil, fmt.Errorf("unable to decode to metav1.Event") } switch got.Type { - case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error): + case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error), string(watch.Bookmark): default: return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type) } diff --git a/rest/watch/decoder_test.go b/rest/watch/decoder_test.go index bfc58130..2e02cf75 100644 --- a/rest/watch/decoder_test.go +++ b/rest/watch/decoder_test.go @@ -42,7 +42,7 @@ func getDecoder() runtime.Decoder { } func TestDecoder(t *testing.T) { - table := []watch.EventType{watch.Added, watch.Deleted, watch.Modified, watch.Error} + table := []watch.EventType{watch.Added, watch.Deleted, watch.Modified, watch.Error, watch.Bookmark} for _, eventType := range table { out, in := io.Pipe() diff --git a/rest/watch/encoder_test.go b/rest/watch/encoder_test.go index 577b7010..b56ce432 100644 --- a/rest/watch/encoder_test.go +++ b/rest/watch/encoder_test.go @@ -56,6 +56,10 @@ func TestEncodeDecodeRoundTrip(t *testing.T) { watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, }, + { + watch.Bookmark, + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, } for i, testCase := range testCases { buf := &bytes.Buffer{} diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 023fd5e8..4b5daeed 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -271,6 +271,11 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // We want to avoid situations of hanging watchers. Stop any wachers that do not // receive any events within the timeout window. TimeoutSeconds: &timeoutSeconds, + // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks. + // Reflector doesn't assume bookmarks are returned at all (if the server do not support + // watch bookmarks, it will ignore this field). + // Disabled in Alpha release of watch bookmarks feature. + AllowWatchBookmarks: false, } w, err := r.listerWatcher.Watch(options) @@ -368,6 +373,8 @@ loop: if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } + case watch.Bookmark: + // A `Bookmark` means watch has synced here, just update the resourceVersion default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } diff --git a/tools/watch/retrywatcher.go b/tools/watch/retrywatcher.go index e45d58ec..47ae9df4 100644 --- a/tools/watch/retrywatcher.go +++ b/tools/watch/retrywatcher.go @@ -153,7 +153,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { // We need to inspect the event and get ResourceVersion out of it switch event.Type { - case watch.Added, watch.Modified, watch.Deleted: + case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark: metaObject, ok := event.Object.(resourceVersionGetter) if !ok { _ = rw.send(watch.Event{