diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 3948129c578..8620c9fcb73 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -415,6 +415,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { // We don't want to terminate all watchers as recreating all watchers puts high load on api-server. // In most of the cases, leader is reelected within few cycles. reflector.MaxInternalErrorRetryDuration = time.Second * 30 + // since the watch-list is provided by the watch cache instruct + // the reflector to issue a regular LIST against the store + reflector.UseWatchList = false cacher.watchCache = watchCache cacher.reflector = reflector diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index 466a708e9a6..eaed229569d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -18,6 +18,7 @@ package cache import ( "errors" + "os" "sync" "time" @@ -147,6 +148,9 @@ func (c *controller) Run(stopCh <-chan struct{}) { if c.config.WatchErrorHandler != nil { r.watchErrorHandler = c.config.WatchErrorHandler } + if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 { + r.UseWatchList = true + } c.reflectorMutex.Lock() c.reflector = r diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index e0ac732ee9a..7363ce35e6e 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -41,6 +41,7 @@ import ( "k8s.io/client-go/tools/pager" "k8s.io/klog/v2" "k8s.io/utils/clock" + "k8s.io/utils/pointer" "k8s.io/utils/trace" ) @@ -99,6 +100,15 @@ type Reflector struct { ShouldResync func() bool // MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch. MaxInternalErrorRetryDuration time.Duration + // UseWatchList if turned on instructs the reflector to open a stream to bring data from the API server. + // Streaming has the primary advantage of using fewer server's resources to fetch data. + // + // The old behaviour establishes a LIST request which gets data in chunks. + // Paginated list is less efficient and depending on the actual size of objects + // might result in an increased memory consumption of the APIServer. + // + // See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details + UseWatchList bool } // ResourceVersionUpdater is an interface that allows store implementation to @@ -311,17 +321,39 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { // It returns error if ListAndWatch didn't even try to initialize watch. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name) + var err error + var w watch.Interface + fallbackToList := !r.UseWatchList - err := r.list(stopCh) - if err != nil { - return err + if r.UseWatchList { + w, err = r.watchList(stopCh) + if w == nil && err == nil { + // stopCh was closed + return nil + } + if err != nil { + if !apierrors.IsInvalid(err) { + return err + } + klog.Warning("the watch-list feature is not supported by the server, falling back to the previous LIST/WATCH semantic") + fallbackToList = true + // Ensure that we won't accidentally pass some garbage down the watch. + w = nil + } + } + + if fallbackToList { + err = r.list(stopCh) + if err != nil { + return err + } } resyncerrc := make(chan error, 1) cancelCh := make(chan struct{}) defer close(cancelCh) go r.startResync(stopCh, cancelCh, resyncerrc) - return r.watch(nil, stopCh, resyncerrc) + return r.watch(w, stopCh, resyncerrc) } // startResync periodically calls r.store.Resync() method. @@ -392,8 +424,9 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc } } - err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh) + err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh) // Ensure that watch will not be reused across iterations. + w.Stop() w = nil retry.After(err) if err != nil { @@ -528,6 +561,114 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { return nil } +// watchList establishes a stream to get a consistent snapshot of data +// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal +// +// case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan) +// Establishes a consistent stream with the server. +// That means the returned data is consistent, as if, served directly from etcd via a quorum read. +// It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion. +// It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion. +// After receiving a "Bookmark" event the reflector is considered to be synchronized. +// It replaces its internal store with the collected items and +// reuses the current watch requests for getting further events. +// +// case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan) +// Establishes a stream with the server at the provided resource version. +// To establish the initial state the server begins with synthetic "Added" events. +// It ends with a synthetic "Bookmark" event containing the provided or newer resource version. +// After receiving a "Bookmark" event the reflector is considered to be synchronized. +// It replaces its internal store with the collected items and +// reuses the current watch requests for getting further events. +func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { + var w watch.Interface + var err error + var temporaryStore Store + var resourceVersion string + // TODO(#115478): see if this function could be turned + // into a method and see if error handling + // could be unified with the r.watch method + isErrorRetriableWithSideEffectsFn := func(err error) bool { + if canRetry := isWatchErrorRetriable(err); canRetry { + klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err) + <-r.initConnBackoffManager.Backoff().C() + return true + } + if isExpiredError(err) || isTooLargeResourceVersionError(err) { + // we tried to re-establish a watch request but the provided RV + // has either expired or it is greater than the server knows about. + // In that case we reset the RV and + // try to get a consistent snapshot from the watch cache (case 1) + r.setIsLastSyncResourceVersionUnavailable(true) + return true + } + return false + } + + initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name}) + defer initTrace.LogIfLong(10 * time.Second) + for { + select { + case <-stopCh: + return nil, nil + default: + } + + resourceVersion = "" + lastKnownRV := r.rewatchResourceVersion() + temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc) + // TODO(#115478): large "list", slow clients, slow network, p&f + // might slow down streaming and eventually fail. + // maybe in such a case we should retry with an increased timeout? + timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) + options := metav1.ListOptions{ + ResourceVersion: lastKnownRV, + AllowWatchBookmarks: true, + SendInitialEvents: pointer.Bool(true), + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: &timeoutSeconds, + } + start := r.clock.Now() + + w, err = r.listerWatcher.Watch(options) + if err != nil { + if isErrorRetriableWithSideEffectsFn(err) { + continue + } + return nil, err + } + bookmarkReceived := pointer.Bool(false) + err = watchHandler(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription, + func(rv string) { resourceVersion = rv }, + bookmarkReceived, + r.clock, make(chan error), stopCh) + if err != nil { + w.Stop() // stop and retry with clean state + if err == errorStopRequested { + return nil, nil + } + if isErrorRetriableWithSideEffectsFn(err) { + continue + } + return nil, err + } + if *bookmarkReceived { + break + } + } + // We successfully got initial state from watch-list confirmed by the + // "k8s.io/initial-events-end" bookmark. + initTrace.Step("Objects streamed", trace.Field{Key: "count", Value: len(temporaryStore.List())}) + r.setIsLastSyncResourceVersionUnavailable(false) + if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { + return nil, fmt.Errorf("unable to sync watch-list result: %v", err) + } + initTrace.Step("SyncWith done") + r.setLastSyncResourceVersion(resourceVersion) + + return w, nil +} + // syncWith replaces the store's items with the given list. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { found := make([]interface{}, 0, len(items)) @@ -546,15 +687,17 @@ func watchHandler(start time.Time, name string, expectedTypeName string, setLastSyncResourceVersion func(string), + exitOnInitialEventsEndBookmark *bool, clock clock.Clock, errc chan error, stopCh <-chan struct{}, ) error { eventCount := 0 - - // Stopping the watcher should be idempotent and if we return from this function there's no way - // we're coming back in with the same watch interface. - defer w.Stop() + if exitOnInitialEventsEndBookmark != nil { + // set it to false just in case somebody + // made it positive + *exitOnInitialEventsEndBookmark = false + } loop: for { @@ -609,6 +752,11 @@ loop: } case watch.Bookmark: // A `Bookmark` means watch has synced here, just update the resourceVersion + if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok { + if exitOnInitialEventsEndBookmark != nil { + *exitOnInitialEventsEndBookmark = true + } + } default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) } @@ -617,6 +765,11 @@ loop: rvu.UpdateResourceVersion(resourceVersion) } eventCount++ + if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark { + watchDuration := clock.Since(start) + klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration) + return nil + } } } @@ -665,6 +818,18 @@ func (r *Reflector) relistResourceVersion() string { return r.lastSyncResourceVersion } +// rewatchResourceVersion determines the resource version the reflector should start streaming from. +func (r *Reflector) rewatchResourceVersion() string { + r.lastSyncResourceVersionMutex.RLock() + defer r.lastSyncResourceVersionMutex.RUnlock() + if r.isLastSyncResourceVersionUnavailable { + // initial stream should return data at the most recent resource version. + // the returned data must be consistent i.e. as if served from etcd via a quorum read + return "" + } + return r.lastSyncResourceVersion +} + // setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned // "expired" or "too large resource version" error. func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) { diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index d772a5d286d..2b7ee789d5f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -138,7 +138,7 @@ func TestReflectorWatchHandlerError(t *testing.T) { go func() { fw.Stop() }() - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) + err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop) if err == nil { t.Errorf("unexpected non-error") } @@ -157,7 +157,7 @@ func TestReflectorWatchHandler(t *testing.T) { fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) fw.Stop() }() - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) + err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop) if err != nil { t.Errorf("unexpected error %v", err) } @@ -205,7 +205,7 @@ func TestReflectorStopWatch(t *testing.T) { fw := watch.NewFake() stopWatch := make(chan struct{}, 1) stopWatch <- struct{}{} - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch) + err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopWatch) if err != errorStopRequested { t.Errorf("expected stop error, got %q", err) } diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_watchlist_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_watchlist_test.go new file mode 100644 index 00000000000..ae1750c7bb7 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_watchlist_test.go @@ -0,0 +1,518 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "sort" + "sync" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/utils/pointer" +) + +func TestWatchList(t *testing.T) { + scenarios := []struct { + name string + disableUseWatchList bool + + // closes listWatcher after sending the specified number of watch events + closeAfterWatchEvents int + // closes listWatcher after getting the specified number of watch requests + closeAfterWatchRequests int + // closes listWatcher after getting the specified number of list requests + closeAfterListRequests int + + // stops Watcher after sending the specified number of watch events + stopAfterWatchEvents int + + watchOptionsPredicate func(options metav1.ListOptions) error + watchEvents []watch.Event + podList *v1.PodList + + expectedRequestOptions []metav1.ListOptions + expectedWatchRequests int + expectedListRequests int + expectedStoreContent []v1.Pod + expectedError error + }{ + { + name: "the reflector won't be synced if the bookmark event has been received", + watchEvents: []watch.Event{{Type: watch.Added, Object: makePod("p1", "1")}}, + closeAfterWatchEvents: 1, + expectedWatchRequests: 1, + expectedRequestOptions: []metav1.ListOptions{{ + SendInitialEvents: pointer.Bool(true), + AllowWatchBookmarks: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: pointer.Int64(1), + }}, + }, + { + name: "the reflector uses the old LIST/WATCH semantics if the UseWatchList is turned off", + disableUseWatchList: true, + closeAfterWatchRequests: 1, + podList: &v1.PodList{ + ListMeta: metav1.ListMeta{ResourceVersion: "1"}, + Items: []v1.Pod{*makePod("p1", "1")}, + }, + expectedWatchRequests: 1, + expectedListRequests: 1, + expectedRequestOptions: []metav1.ListOptions{ + { + ResourceVersion: "0", + Limit: 500, + }, + { + AllowWatchBookmarks: true, + ResourceVersion: "1", + TimeoutSeconds: pointer.Int64(1), + }}, + expectedStoreContent: []v1.Pod{*makePod("p1", "1")}, + }, + { + name: "returning any other error than apierrors.NewInvalid stops the reflector and reports the error", + watchOptionsPredicate: func(options metav1.ListOptions) error { + return fmt.Errorf("dummy error") + }, + expectedError: fmt.Errorf("dummy error"), + expectedWatchRequests: 1, + expectedRequestOptions: []metav1.ListOptions{{ + SendInitialEvents: pointer.Bool(true), + AllowWatchBookmarks: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: pointer.Int64(1), + }}, + }, + { + name: "the reflector can fall back to old LIST/WATCH semantics when a server doesn't support streaming", + watchOptionsPredicate: func(options metav1.ListOptions) error { + if options.SendInitialEvents != nil && *options.SendInitialEvents { + return apierrors.NewInvalid(schema.GroupKind{}, "streaming is not allowed", nil) + } + return nil + }, + podList: &v1.PodList{ + ListMeta: metav1.ListMeta{ResourceVersion: "1"}, + Items: []v1.Pod{*makePod("p1", "1")}, + }, + closeAfterWatchEvents: 1, + watchEvents: []watch.Event{{Type: watch.Added, Object: makePod("p2", "2")}}, + expectedWatchRequests: 2, + expectedListRequests: 1, + expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")}, + expectedRequestOptions: []metav1.ListOptions{ + { + SendInitialEvents: pointer.Bool(true), + AllowWatchBookmarks: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: pointer.Int64(1), + }, + { + ResourceVersion: "0", + Limit: 500, + }, + { + AllowWatchBookmarks: true, + ResourceVersion: "1", + TimeoutSeconds: pointer.Int64(1), + }, + }, + }, + { + name: "prove that the reflector is synced after receiving a bookmark event", + closeAfterWatchEvents: 3, + watchEvents: []watch.Event{ + {Type: watch.Added, Object: makePod("p1", "1")}, + {Type: watch.Added, Object: makePod("p2", "2")}, + {Type: watch.Bookmark, Object: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + }, + expectedWatchRequests: 1, + expectedRequestOptions: []metav1.ListOptions{{ + SendInitialEvents: pointer.Bool(true), + AllowWatchBookmarks: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: pointer.Int64(1), + }}, + expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")}, + }, + { + name: "check if Updates and Deletes events are propagated during streaming (until the bookmark is received)", + closeAfterWatchEvents: 6, + watchEvents: []watch.Event{ + {Type: watch.Added, Object: makePod("p1", "1")}, + {Type: watch.Added, Object: makePod("p2", "2")}, + {Type: watch.Modified, Object: func() runtime.Object { + p1 := makePod("p1", "3") + p1.Spec.ActiveDeadlineSeconds = pointer.Int64(12) + return p1 + }()}, + {Type: watch.Added, Object: makePod("p3", "4")}, + {Type: watch.Deleted, Object: makePod("p3", "5")}, + {Type: watch.Bookmark, Object: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "5", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + }, + expectedWatchRequests: 1, + expectedRequestOptions: []metav1.ListOptions{{ + SendInitialEvents: pointer.Bool(true), + AllowWatchBookmarks: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: pointer.Int64(1), + }}, + expectedStoreContent: []v1.Pod{ + *makePod("p2", "2"), + func() v1.Pod { + p1 := *makePod("p1", "3") + p1.Spec.ActiveDeadlineSeconds = pointer.Int64(12) + return p1 + }(), + }, + }, + { + name: "checks if the reflector retries 429", + watchOptionsPredicate: func() func(options metav1.ListOptions) error { + counter := 1 + return func(options metav1.ListOptions) error { + if counter < 3 { + counter++ + return apierrors.NewTooManyRequests("busy, check again later", 1) + } + return nil + } + }(), + closeAfterWatchEvents: 2, + watchEvents: []watch.Event{ + {Type: watch.Added, Object: makePod("p1", "1")}, + {Type: watch.Bookmark, Object: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + }, + expectedWatchRequests: 3, + expectedRequestOptions: []metav1.ListOptions{ + { + SendInitialEvents: pointer.Bool(true), + AllowWatchBookmarks: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: pointer.Int64(1), + }, + { + SendInitialEvents: pointer.Bool(true), + AllowWatchBookmarks: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: pointer.Int64(1), + }, + { + SendInitialEvents: pointer.Bool(true), + AllowWatchBookmarks: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: pointer.Int64(1), + }, + }, + expectedStoreContent: []v1.Pod{*makePod("p1", "1")}, + }, + { + name: "check if stopping a watcher before sync results in creating a new watch-list request", + stopAfterWatchEvents: 1, + closeAfterWatchEvents: 3, + watchEvents: []watch.Event{ + {Type: watch.Added, Object: makePod("p1", "1")}, + // second request + {Type: watch.Added, Object: makePod("p1", "1")}, + {Type: watch.Bookmark, Object: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + }, + expectedWatchRequests: 2, + expectedRequestOptions: []metav1.ListOptions{ + { + SendInitialEvents: pointer.Bool(true), + AllowWatchBookmarks: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: pointer.Int64(1), + }, + { + SendInitialEvents: pointer.Bool(true), + AllowWatchBookmarks: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: pointer.Int64(1), + }, + }, + expectedStoreContent: []v1.Pod{*makePod("p1", "1")}, + }, + { + name: "stopping a watcher after synchronization results in creating a new watch request", + stopAfterWatchEvents: 4, + closeAfterWatchEvents: 5, + watchEvents: []watch.Event{ + {Type: watch.Added, Object: makePod("p1", "1")}, + {Type: watch.Added, Object: makePod("p2", "2")}, + {Type: watch.Bookmark, Object: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + {Type: watch.Added, Object: makePod("p3", "3")}, + // second request + {Type: watch.Added, Object: makePod("p4", "4")}, + }, + expectedWatchRequests: 2, + expectedRequestOptions: []metav1.ListOptions{ + { + SendInitialEvents: pointer.Bool(true), + AllowWatchBookmarks: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: pointer.Int64(1), + }, + { + AllowWatchBookmarks: true, + ResourceVersion: "3", + TimeoutSeconds: pointer.Int64(1), + }, + }, + expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3"), *makePod("p4", "4")}, + }, + { + name: "expiring an established watcher results in returning an error from the reflector", + watchOptionsPredicate: func() func(options metav1.ListOptions) error { + counter := 0 + return func(options metav1.ListOptions) error { + counter++ + if counter == 2 { + return apierrors.NewResourceExpired("rv already expired") + } + return nil + } + }(), + stopAfterWatchEvents: 3, + watchEvents: []watch.Event{ + {Type: watch.Added, Object: makePod("p1", "1")}, + {Type: watch.Bookmark, Object: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + {Type: watch.Added, Object: makePod("p3", "3")}, + }, + expectedWatchRequests: 2, + expectedRequestOptions: []metav1.ListOptions{ + { + SendInitialEvents: pointer.Bool(true), + AllowWatchBookmarks: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: pointer.Int64(1), + }, + { + AllowWatchBookmarks: true, + ResourceVersion: "3", + TimeoutSeconds: pointer.Int64(1), + }, + }, + expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p3", "3")}, + expectedError: apierrors.NewResourceExpired("rv already expired"), + }, + } + for _, s := range scenarios { + t.Run(s.name, func(t *testing.T) { + scenario := s // capture as local variable + listWatcher, store, reflector, stopCh := testData() + go func() { + for i, e := range scenario.watchEvents { + listWatcher.fakeWatcher.Action(e.Type, e.Object) + if i+1 == scenario.stopAfterWatchEvents { + listWatcher.StopAndRecreateWatch() + continue + } + if i+1 == scenario.closeAfterWatchEvents { + close(stopCh) + } + } + }() + listWatcher.watchOptionsPredicate = scenario.watchOptionsPredicate + listWatcher.closeAfterWatchRequests = scenario.closeAfterWatchRequests + listWatcher.customListResponse = scenario.podList + listWatcher.closeAfterListRequests = scenario.closeAfterListRequests + if scenario.disableUseWatchList { + reflector.UseWatchList = false + } + + err := reflector.ListAndWatch(stopCh) + if scenario.expectedError != nil && err == nil { + t.Fatalf("expected error %q, got nil", scenario.expectedError) + } + if scenario.expectedError == nil && err != nil { + t.Fatalf("unexpected error: %v", err) + } + if scenario.expectedError != nil && err.Error() != scenario.expectedError.Error() { + t.Fatalf("expected error %q, got %q", scenario.expectedError, err.Error()) + } + + verifyWatchCounter(t, listWatcher, scenario.expectedWatchRequests) + verifyListCounter(t, listWatcher, scenario.expectedListRequests) + verifyRequestOptions(t, listWatcher, scenario.expectedRequestOptions) + verifyStore(t, store, scenario.expectedStoreContent) + }) + } +} + +func verifyRequestOptions(t *testing.T, lw *fakeListWatcher, expectedRequestOptions []metav1.ListOptions) { + if len(lw.requestOptions) != len(expectedRequestOptions) { + t.Fatalf("expected to receive exactly %v requests, got %v", len(expectedRequestOptions), len(lw.requestOptions)) + } + + for index, expectedRequestOption := range expectedRequestOptions { + actualRequestOption := lw.requestOptions[index] + if actualRequestOption.TimeoutSeconds == nil && expectedRequestOption.TimeoutSeconds != nil { + t.Fatalf("expected the request to specify TimeoutSeconds option but it didn't, actual = %#v, expected = %#v", actualRequestOption, expectedRequestOption) + } + if actualRequestOption.TimeoutSeconds != nil && expectedRequestOption.TimeoutSeconds == nil { + t.Fatalf("unexpected TimeoutSeconds option specified, actual = %#v, expected = %#v", actualRequestOption, expectedRequestOption) + } + // ignore actual values + actualRequestOption.TimeoutSeconds = nil + expectedRequestOption.TimeoutSeconds = nil + if !cmp.Equal(actualRequestOption, expectedRequestOption) { + t.Fatalf("expected %#v, got %#v", expectedRequestOption, actualRequestOption) + } + } +} + +func verifyListCounter(t *testing.T, lw *fakeListWatcher, expectedListCounter int) { + if lw.listCounter != expectedListCounter { + t.Fatalf("unexpected number of LIST requests, got: %v, expected: %v", lw.listCounter, expectedListCounter) + } +} + +func verifyWatchCounter(t *testing.T, lw *fakeListWatcher, expectedWatchCounter int) { + if lw.watchCounter != expectedWatchCounter { + t.Fatalf("unexpected number of WATCH requests, got: %v, expected: %v", lw.watchCounter, expectedWatchCounter) + } +} + +type byName []v1.Pod + +func (a byName) Len() int { return len(a) } +func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name } +func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func verifyStore(t *testing.T, s Store, expectedPods []v1.Pod) { + rawPods := s.List() + actualPods := []v1.Pod{} + for _, p := range rawPods { + actualPods = append(actualPods, *p.(*v1.Pod)) + } + + sort.Sort(byName(actualPods)) + sort.Sort(byName(expectedPods)) + if !cmp.Equal(actualPods, expectedPods, cmpopts.EquateEmpty()) { + t.Fatalf("unexpected store content, diff: %s", cmp.Diff(actualPods, expectedPods)) + } +} + +func makePod(name, rv string) *v1.Pod { + return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: rv}} +} + +func testData() (*fakeListWatcher, Store, *Reflector, chan struct{}) { + s := NewStore(MetaNamespaceKeyFunc) + stopCh := make(chan struct{}) + lw := &fakeListWatcher{ + fakeWatcher: watch.NewFake(), + stop: func() { + close(stopCh) + }, + } + r := NewReflector(lw, &v1.Pod{}, s, 0) + r.UseWatchList = true + + return lw, s, r, stopCh +} + +type fakeListWatcher struct { + lock sync.Mutex + fakeWatcher *watch.FakeWatcher + listCounter int + watchCounter int + closeAfterWatchRequests int + closeAfterListRequests int + stop func() + + requestOptions []metav1.ListOptions + + customListResponse *v1.PodList + watchOptionsPredicate func(options metav1.ListOptions) error +} + +func (lw *fakeListWatcher) List(options metav1.ListOptions) (runtime.Object, error) { + lw.listCounter++ + lw.requestOptions = append(lw.requestOptions, options) + if lw.listCounter == lw.closeAfterListRequests { + lw.stop() + } + if lw.customListResponse != nil { + return lw.customListResponse, nil + } + return nil, fmt.Errorf("not implemented") +} + +func (lw *fakeListWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { + lw.watchCounter++ + lw.requestOptions = append(lw.requestOptions, options) + if lw.watchCounter == lw.closeAfterWatchRequests { + lw.stop() + } + if lw.watchOptionsPredicate != nil { + if err := lw.watchOptionsPredicate(options); err != nil { + return nil, err + } + } + lw.lock.Lock() + defer lw.lock.Unlock() + return lw.fakeWatcher, nil +} + +func (lw *fakeListWatcher) StopAndRecreateWatch() { + lw.lock.Lock() + defer lw.lock.Unlock() + lw.fakeWatcher.Stop() + lw.fakeWatcher = watch.NewFake() +}