diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go index fbcc4d6404c..69cc9f7e852 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go @@ -19,8 +19,10 @@ package cacher import ( "context" "fmt" + "sync" "time" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -32,6 +34,21 @@ import ( "k8s.io/klog/v2" ) +// possible states of the cache watcher +const ( + // cacheWatcherWaitingForBookmark indicates the cacher + // is waiting for a bookmark event with a specific RV set + cacheWatcherWaitingForBookmark = iota + + // cacheWatcherBookmarkReceived indicates that the cacher + // has received a bookmark event with required RV + cacheWatcherBookmarkReceived + + // cacheWatcherBookmarkSent indicates that the cacher + // has already sent a bookmark event to a client + cacheWatcherBookmarkSent +) + // cacheWatcher implements watch.Interface // this is not thread-safe type cacheWatcher struct { @@ -55,6 +72,20 @@ type cacheWatcher struct { // drainInputBuffer indicates whether we should delay closing this watcher // and send all event in the input buffer. drainInputBuffer bool + + // bookmarkAfterResourceVersion holds an RV that indicates + // when we should start delivering bookmark events. + // If this field holds the value of 0 that means + // we don't have any special preferences toward delivering bookmark events. + // Note that this field is used in conjunction with the state field. + // It should not be changed once the watcher has been started. + bookmarkAfterResourceVersion uint64 + + // stateMutex protects state + stateMutex sync.Mutex + + // state holds a numeric value indicating the current state of the watcher + state int } func newCacheWatcher( @@ -115,8 +146,17 @@ func (c *cacheWatcher) stopLocked() { } func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { + // if the bookmarkAfterResourceVersion hasn't been seen + // we will try to deliver a bookmark event every second. + // the following check will discard a bookmark event + // if it is < than the bookmarkAfterResourceVersion + // so that we don't pollute the input channel + if event.Type == watch.Bookmark && event.ResourceVersion < c.bookmarkAfterResourceVersion { + return false + } select { case c.input <- event: + c.markBookmarkAfterRvAsReceived(event) return true default: return false @@ -124,6 +164,9 @@ func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { } // Nil timer means that add will not block (if it can't send event immediately, it will break the watcher) +// +// Note that bookmark events are never added via the add method only via the nonblockingAdd. +// Changing this behaviour will require moving the markBookmarkAfterRvAsReceived method func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { // Try to send the event immediately, without blocking. if c.nonblockingAdd(event) { @@ -136,7 +179,31 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { // we simply terminate it. klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result)) metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc() - c.forget(false) + // This means that we couldn't send event to that watcher. + // Since we don't want to block on it infinitely, we simply terminate it. + + // we are graceful = false, when: + // + // (a) The bookmarkAfterResourceVersionReceived hasn't been received, + // we can safely terminate the watcher. Because the client is waiting + // for this specific bookmark, and we even haven't received one. + // (b) We have seen the bookmarkAfterResourceVersion, and it was sent already to the client. + // We can simply terminate the watcher. + + // we are graceful = true, when: + // + // (a) We have seen a bookmark, but it hasn't been sent to the client yet. + // That means we should drain the input buffer which contains + // the bookmarkAfterResourceVersion we want. We do that to make progress + // as clients can re-establish a new watch with the given RV and receive + // further notifications. + graceful := func() bool { + c.stateMutex.Lock() + defer c.stateMutex.Unlock() + return c.state == cacheWatcherBookmarkReceived + }() + klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v, graceful = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result), graceful) + c.forget(graceful) } if timer == nil { @@ -162,10 +229,20 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Du // // (b) roughly every minute // + // (c) immediately when the bookmarkAfterResourceVersion wasn't confirmed + // in this scenario the client have already seen (or is in the process of sending) + // all initial data and is interested in seeing + // a specific RV value (aka. the bookmarkAfterResourceVersion) + // since we don't know when the cacher will see the RV we increase frequency + // // (b) gives us periodicity if the watch breaks due to unexpected // conditions, (a) ensures that on timeout the watcher is as close to // now as possible - this covers 99% of cases. + if !c.wasBookmarkAfterRvReceived() { + return time.Time{}, true // schedule immediately + } + heartbeatTime := now.Add(bookmarkFrequency) if c.deadline.IsZero() { // Timeout is set by our client libraries (e.g. reflector) as well as defaulted by @@ -182,6 +259,76 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Du return heartbeatTime, true } +// wasBookmarkAfterRvReceived same as wasBookmarkAfterRvReceivedLocked just acquires a lock +func (c *cacheWatcher) wasBookmarkAfterRvReceived() bool { + c.stateMutex.Lock() + defer c.stateMutex.Unlock() + return c.wasBookmarkAfterRvReceivedLocked() +} + +// wasBookmarkAfterRvReceivedLocked checks if the given cacheWatcher +// have seen a bookmark event >= bookmarkAfterResourceVersion +func (c *cacheWatcher) wasBookmarkAfterRvReceivedLocked() bool { + return c.state != cacheWatcherWaitingForBookmark +} + +// markBookmarkAfterRvAsReceived indicates that the given cacheWatcher +// have seen a bookmark event >= bookmarkAfterResourceVersion +func (c *cacheWatcher) markBookmarkAfterRvAsReceived(event *watchCacheEvent) { + if event.Type == watch.Bookmark { + c.stateMutex.Lock() + defer c.stateMutex.Unlock() + if c.wasBookmarkAfterRvReceivedLocked() { + return + } + // bookmark events are scheduled by startDispatchingBookmarkEvents method + // since we received a bookmark event that means we have + // converged towards the expected RV and it is okay to update the state so that + // this cacher can be scheduler for a regular bookmark events + c.state = cacheWatcherBookmarkReceived + } +} + +// wasBookmarkAfterRvSentLocked checks if a bookmark event +// with an RV >= the bookmarkAfterResourceVersion has been sent by this watcher +func (c *cacheWatcher) wasBookmarkAfterRvSentLocked() bool { + return c.state == cacheWatcherBookmarkSent +} + +// wasBookmarkAfterRvSent same as wasBookmarkAfterRvSentLocked just acquires a lock +func (c *cacheWatcher) wasBookmarkAfterRvSent() bool { + c.stateMutex.Lock() + defer c.stateMutex.Unlock() + return c.wasBookmarkAfterRvSentLocked() +} + +// markBookmarkAfterRvSent indicates that the given cacheWatcher +// have sent a bookmark event with an RV >= the bookmarkAfterResourceVersion +// +// this function relies on the fact that the nonblockingAdd method +// won't admit a bookmark event with an RV < the bookmarkAfterResourceVersion +// so the first received bookmark event is considered to match the bookmarkAfterResourceVersion +func (c *cacheWatcher) markBookmarkAfterRvSent(event *watchCacheEvent) { + // note that bookmark events are not so common so will acquire a lock every ~60 second or so + if event.Type == watch.Bookmark { + c.stateMutex.Lock() + defer c.stateMutex.Unlock() + if !c.wasBookmarkAfterRvSentLocked() { + c.state = cacheWatcherBookmarkSent + } + } +} + +// setBookmarkAfterResourceVersion sets the bookmarkAfterResourceVersion and the state associated with it +func (c *cacheWatcher) setBookmarkAfterResourceVersion(bookmarkAfterResourceVersion uint64) { + state := cacheWatcherWaitingForBookmark + if bookmarkAfterResourceVersion == 0 { + state = cacheWatcherBookmarkSent // if no specific RV was requested we assume no-op + } + c.state = state + c.bookmarkAfterResourceVersion = bookmarkAfterResourceVersion +} + // setDrainInputBufferLocked if set to true indicates that we should delay closing this watcher // until we send all events residing in the input buffer. func (c *cacheWatcher) setDrainInputBufferLocked(drain bool) { @@ -216,7 +363,21 @@ func updateResourceVersion(object runtime.Object, versioner storage.Versioner, r func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event { if event.Type == watch.Bookmark { - return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()} + e := &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()} + if !c.wasBookmarkAfterRvSent() { + objMeta, err := meta.Accessor(e.Object) + if err != nil { + utilruntime.HandleError(fmt.Errorf("error while accessing object's metadata gr: %v, identifier: %v, obj: %#v, err: %v", c.groupResource, c.identifier, e.Object, err)) + return nil + } + objAnnotations := objMeta.GetAnnotations() + if objAnnotations == nil { + objAnnotations = map[string]string{} + } + objAnnotations["k8s.io/initial-events-end"] = "true" + objMeta.SetAnnotations(objAnnotations) + } + return e } curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields) @@ -276,6 +437,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { select { case c.result <- *watchEvent: + c.markBookmarkAfterRvSent(event) case <-c.done: } } @@ -360,7 +522,9 @@ func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) { return } // only send events newer than resourceVersion - if event.ResourceVersion > resourceVersion { + // or a bookmark event with an RV equal to resourceVersion + // if we haven't sent one to the client + if event.ResourceVersion > resourceVersion || (event.Type == watch.Bookmark && event.ResourceVersion == resourceVersion && !c.wasBookmarkAfterRvSent()) { c.sendWatchCacheEvent(event) } case <-ctx.Done(): diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go index 42b644da3f1..c183259519e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" testingclock "k8s.io/utils/clock/testing" ) @@ -291,7 +292,9 @@ func TestTimeBucketWatchersBasic(t *testing.T) { forget := func(bool) {} newWatcher := func(deadline time.Time) *cacheWatcher { - return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") + w := newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") + w.setBookmarkAfterResourceVersion(0) + return w } clock := testingclock.NewFakeClock(time.Now()) @@ -412,3 +415,179 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) { t.Fatalf("expected forget() to be called three times, because processInterval should call Stop(): %v", err) } } + +// TestCacheWatcherDrainingNoBookmarkAfterResourceVersionReceived verifies if the watcher will be stopped +// when adding an item times out and the bookmarkAfterResourceVersion hasn't been received +func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionReceived(t *testing.T) { + var lock sync.RWMutex + var w *cacheWatcher + count := 0 + filter := func(string, labels.Set, fields.Set) bool { return true } + forget := func(drainWatcher bool) { + lock.Lock() + defer lock.Unlock() + if drainWatcher == true { + t.Fatalf("didn't expect drainWatcher to be set to true") + } + count++ + w.setDrainInputBufferLocked(drainWatcher) + w.stopLocked() + } + initEvents := []*watchCacheEvent{ + {Object: &v1.Pod{}}, + {Object: &v1.Pod{}}, + } + w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") + w.setBookmarkAfterResourceVersion(10) + go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0) + if w.add(&watchCacheEvent{Object: &v1.Pod{}}, time.NewTimer(1*time.Second)) { + t.Fatal("expected the add method to fail") + } + if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { + lock.RLock() + defer lock.RUnlock() + return count == 2, nil + }); err != nil { + t.Fatalf("expected forget() to be called twice, first call from w.add() and then from w.Stop() called from w.processInterval(): %v", err) + } + + if !w.stopped { + t.Fatal("expected the watcher to be stopped but it wasn't") + } +} + +// TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent checks if the watcher's input +// channel is drained if the bookmarkAfterResourceVersion was received but not sent +func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) { + makePod := func(rv uint64) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", rv), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%d", rv), + Annotations: map[string]string{}, + }, + } + } + var lock sync.RWMutex + var w *cacheWatcher + watchInitializationSignal := utilflowcontrol.NewInitializationSignal() + ctx := utilflowcontrol.WithInitializationSignal(context.Background(), watchInitializationSignal) + count := 0 + filter := func(string, labels.Set, fields.Set) bool { return true } + forget := func(drainWatcher bool) { + lock.Lock() + defer lock.Unlock() + count++ + w.setDrainInputBufferLocked(drainWatcher) + w.stopLocked() + } + initEvents := []*watchCacheEvent{{Object: makePod(1)}, {Object: makePod(2)}} + w = newCacheWatcher(2, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") + w.setBookmarkAfterResourceVersion(10) + go w.processInterval(ctx, intervalFromEvents(initEvents), 0) + watchInitializationSignal.Wait() + + // note that we can add three events even though the chanSize is two because + // one event has been popped off from the input chan + if !w.add(&watchCacheEvent{Object: makePod(5), ResourceVersion: 5}, time.NewTimer(1*time.Second)) { + t.Fatal("failed adding an even to the watcher") + } + if !w.nonblockingAdd(&watchCacheEvent{Type: watch.Bookmark, ResourceVersion: 10, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "10"}}}) { + t.Fatal("failed adding an even to the watcher") + } + if !w.add(&watchCacheEvent{Object: makePod(15), ResourceVersion: 15}, time.NewTimer(1*time.Second)) { + t.Fatal("failed adding an even to the watcher") + } + if w.add(&watchCacheEvent{Object: makePod(20), ResourceVersion: 20}, time.NewTimer(1*time.Second)) { + t.Fatal("expected the add method to fail") + } + if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { + lock.RLock() + defer lock.RUnlock() + return count == 1, nil + }); err != nil { + t.Fatalf("expected forget() to be called once, just from the w.add() method: %v", err) + } + + if !w.stopped { + t.Fatal("expected the watcher to be stopped but it wasn't") + } + verifyEvents(t, w, []watch.Event{ + {Type: watch.Added, Object: makePod(1)}, + {Type: watch.Added, Object: makePod(2)}, + {Type: watch.Added, Object: makePod(5)}, + {Type: watch.Bookmark, Object: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "10", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + {Type: watch.Added, Object: makePod(15)}, + }, true) + + if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { + lock.RLock() + defer lock.RUnlock() + return count == 2, nil + }); err != nil { + t.Fatalf("expected forget() to be called twice, the second call is from w.Stop() method called from w.processInterval(): %v", err) + } +} + +func TestBookmarkAfterResourceVersionWatchers(t *testing.T) { + newWatcher := func(id string, deadline time.Time) *cacheWatcher { + w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set) bool { return true }, func(bool) {}, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id) + w.setBookmarkAfterResourceVersion(10) + return w + } + + clock := testingclock.NewFakeClock(time.Now()) + target := newTimeBucketWatchers(clock, defaultBookmarkFrequency) + if !target.addWatcher(newWatcher("1", clock.Now().Add(2*time.Minute))) { + t.Fatal("failed adding an even to the watcher") + } + + // the watcher is immediately expired (it's waiting for bookmark, so it is scheduled immediately) + ret := target.popExpiredWatchers() + if len(ret) != 1 || len(ret[0]) != 1 { + t.Fatalf("expected only one watcher to be expired") + } + if !target.addWatcher(ret[0][0]) { + t.Fatal("failed adding an even to the watcher") + } + + // after one second time the watcher is still expired + clock.Step(1 * time.Second) + ret = target.popExpiredWatchers() + if len(ret) != 1 || len(ret[0]) != 1 { + t.Fatalf("expected only one watcher to be expired") + } + if !target.addWatcher(ret[0][0]) { + t.Fatal("failed adding an even to the watcher") + } + + // after 29 seconds the watcher is still expired + clock.Step(29 * time.Second) + ret = target.popExpiredWatchers() + if len(ret) != 1 || len(ret[0]) != 1 { + t.Fatalf("expected only one watcher to be expired") + } + + // after confirming the watcher is not expired immediately + ret[0][0].markBookmarkAfterRvAsReceived(&watchCacheEvent{Type: watch.Bookmark, ResourceVersion: 10, Object: &v1.Pod{}}) + if !target.addWatcher(ret[0][0]) { + t.Fatal("failed adding an even to the watcher") + } + clock.Step(30 * time.Second) + ret = target.popExpiredWatchers() + if len(ret) != 0 { + t.Fatalf("didn't expect any watchers to be expired") + } + + clock.Step(30 * time.Second) + ret = target.popExpiredWatchers() + if len(ret) != 1 || len(ret[0]) != 1 { + t.Fatalf("expected only one watcher to be expired") + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 7e4ddcf66e2..9f1cfcbdaea 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "reflect" + "strconv" "sync" "time" @@ -34,7 +35,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/audit" @@ -290,6 +290,9 @@ type Cacher struct { // newFunc is a function that creates new empty object storing a object of type Type. newFunc func() runtime.Object + // newListFunc is a function that creates new empty list for storing objects of type Type. + newListFunc func() runtime.Object + // indexedTrigger is used for optimizing amount of watchers that needs to process // an incoming event. indexedTrigger *indexedTriggerFunc @@ -371,6 +374,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { groupResource: config.GroupResource, versioner: config.Versioner, newFunc: config.NewFunc, + newListFunc: config.NewListFunc, indexedTrigger: indexedTrigger, watcherIdx: 0, watchers: indexedWatchers{ @@ -498,19 +502,18 @@ type namespacedName struct { // Watch implements storage.Interface. func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { - if opts.SendInitialEvents != nil { - return nil, errors.NewInvalid( - schema.GroupKind{Group: c.groupResource.Group, Kind: c.groupResource.Resource}, - "", - field.ErrorList{field.Forbidden(field.NewPath("sendInitialEvents"), "for watch is not yet implemented by the watch cache")}, - ) - } pred := opts.Predicate - // If the resourceVersion is unset, ensure that the rv - // from which the watch is being served, is the latest + // if the watch-list feature wasn't set and the resourceVersion is unset + // ensure that the rv from which the watch is being served, is the latest // one. "latest" is ensured by serving the watch from // the underlying storage. - if opts.ResourceVersion == "" { + // + // it should never happen due to our validation but let's just be super-safe here + // and disable sendingInitialEvents when the feature wasn't enabled + if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil { + opts.SendInitialEvents = nil + } + if opts.SendInitialEvents == nil && opts.ResourceVersion == "" { return c.storage.Watch(ctx, key, opts) } watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion) @@ -553,6 +556,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // watchers on our watcher having a processing hiccup chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported) + // Determine a function that computes the bookmarkAfterResourceVersion + bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, watchRV, opts) + if err != nil { + return newErrWatcher(err), nil + } + + // Determine a function that computes the watchRV we should start from + startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, watchRV, opts) + if err != nil { + return newErrWatcher(err), nil + } + // Determine watch timeout('0' means deadline is not set, ignore checking) deadline, _ := ctx.Deadline() @@ -580,6 +595,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // underlying watchCache is calling processEvent under its lock. c.watchCache.RLock() defer c.watchCache.RUnlock() + watchRV = startWatchResourceVersionFn() cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV) if err != nil { // To match the uncached watch implementation, once we have passed authn/authz/admission, @@ -593,6 +609,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions defer c.Unlock() // Update watcher.forget function once we can compute it. watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported) + // Update the bookMarkAfterResourceVersion + watcher.setBookmarkAfterResourceVersion(bookmarkAfterResourceVersionFn()) c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported) // Add it to the queue only when the client support watch bookmarks. @@ -1165,6 +1183,88 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) { return c.versioner.ParseResourceVersion(resourceVersion) } +// getCurrentResourceVersionFromStorage gets the current resource version from the underlying storage engine. +// this method issues an empty list request and reads only the ResourceVersion from the object metadata +func (c *Cacher) getCurrentResourceVersionFromStorage(ctx context.Context) (uint64, error) { + if c.newListFunc == nil { + return 0, fmt.Errorf("newListFunction wasn't provided for %v", c.objectType) + } + emptyList := c.newListFunc() + pred := storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, // just in case we actually hit something + } + + err := c.storage.GetList(ctx, c.resourcePrefix, storage.ListOptions{Predicate: pred}, emptyList) + if err != nil { + return 0, err + } + emptyListAccessor, err := meta.ListAccessor(emptyList) + if err != nil { + return 0, err + } + if emptyListAccessor == nil { + return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList) + } + + currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion()) + if err != nil { + return 0, err + } + + if currentResourceVersion == 0 { + return 0, fmt.Errorf("the current resource version must be greater than 0") + } + return uint64(currentResourceVersion), nil +} + +// getBookmarkAfterResourceVersionLockedFunc returns a function that +// spits a ResourceVersion after which the bookmark event will be delivered. +// +// The returned function must be called under the watchCache lock. +func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(ctx context.Context, parsedResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) { + if opts.SendInitialEvents == nil || *opts.SendInitialEvents == false || !opts.Predicate.AllowWatchBookmarks { + return func() uint64 { return 0 }, nil + } + return c.getCommonResourceVersionLockedFunc(ctx, parsedResourceVersion, opts) +} + +// getStartResourceVersionForWatchLockedFunc returns a function that +// spits a ResourceVersion the watch will be started from. +// Depending on the input parameters the semantics of the returned ResourceVersion are: +// - start at Exact (return parsedWatchResourceVersion) +// - start at Most Recent (return an RV from etcd) +// - start at Any (return the current watchCache's RV) +// +// The returned function must be called under the watchCache lock. +func (c *Cacher) getStartResourceVersionForWatchLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) { + if opts.SendInitialEvents == nil || *opts.SendInitialEvents == true { + return func() uint64 { return parsedWatchResourceVersion }, nil + } + return c.getCommonResourceVersionLockedFunc(ctx, parsedWatchResourceVersion, opts) +} + +// getCommonResourceVersionLockedFunc a helper that simply computes a ResourceVersion +// based on the input parameters. Please examine callers of this method to get more context. +// +// The returned function must be called under the watchCache lock. +func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) { + switch { + case len(opts.ResourceVersion) == 0: + rv, err := c.getCurrentResourceVersionFromStorage(ctx) + if err != nil { + return nil, err + } + return func() uint64 { return rv }, nil + case parsedWatchResourceVersion == 0: + // here we assume that watchCache locked is already held + return func() uint64 { return c.watchCache.resourceVersion }, nil + default: + return func() uint64 { return parsedWatchResourceVersion }, nil + } +} + // cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher. type cacherListerWatcher struct { storage storage.Interface diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 844bf530c9d..cc6177b06be 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -27,6 +27,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + + "k8s.io/apimachinery/pkg/api/apitesting" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -41,8 +44,14 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/etcd3" + etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + "k8s.io/apiserver/pkg/storage/value/encrypt/identity" + utilfeature "k8s.io/apiserver/pkg/util/feature" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" ) @@ -76,6 +85,9 @@ func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { return strconv.ParseUint(version, 10, 64) } func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) { + if len(resourceVersion) == 0 { + return 0, nil + } return strconv.ParseUint(resourceVersion, 10, 64) } @@ -111,7 +123,8 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) { type dummyStorage struct { sync.RWMutex - err error + err error + getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error } type dummyWatch struct { @@ -151,10 +164,12 @@ func (d *dummyStorage) Get(_ context.Context, _ string, _ storage.GetOptions, _ return d.err } -func (d *dummyStorage) GetList(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error { +func (d *dummyStorage) GetList(ctx context.Context, resPrefix string, opts storage.ListOptions, listObj runtime.Object) error { + if d.getListFn != nil { + return d.getListFn(ctx, resPrefix, opts, listObj) + } d.RLock() defer d.RUnlock() - podList := listObj.(*example.PodList) podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"} return d.err @@ -1082,28 +1097,68 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { } } -func verifyEvents(t *testing.T, w watch.Interface, events []watch.Event) { +func verifyEvents(t *testing.T, w watch.Interface, events []watch.Event, strictOrder bool) { _, _, line, _ := goruntime.Caller(1) - for _, expectedEvent := range events { + actualEvents := make([]watch.Event, len(events)) + for idx := range events { select { case event := <-w.ResultChan(): - if e, a := expectedEvent.Type, event.Type; e != a { - t.Logf("(called from line %d)", line) - t.Errorf("Expected: %s, got: %s", e, a) - } - object := event.Object - if co, ok := object.(runtime.CacheableObject); ok { - object = co.GetObject() - } - if e, a := expectedEvent.Object, object; !apiequality.Semantic.DeepEqual(e, a) { - t.Logf("(called from line %d)", line) - t.Errorf("Expected: %#v, got: %#v", e, a) - } + actualEvents[idx] = event case <-time.After(wait.ForeverTestTimeout): t.Logf("(called from line %d)", line) t.Errorf("Timed out waiting for an event") } } + validateEvents := func(expected, actual watch.Event) (bool, []string) { + errors := []string{} + if e, a := expected.Type, actual.Type; e != a { + errors = append(errors, fmt.Sprintf("Expected: %s, got: %s", e, a)) + } + actualObject := actual.Object + if co, ok := actualObject.(runtime.CacheableObject); ok { + actualObject = co.GetObject() + } + if e, a := expected.Object, actualObject; !apiequality.Semantic.DeepEqual(e, a) { + errors = append(errors, fmt.Sprintf("Expected: %#v, got: %#v", e, a)) + } + return len(errors) == 0, errors + } + + if len(events) != len(actualEvents) { + t.Fatalf("unexpected number of events: %d, expected: %d, acutalEvents: %#v, expectedEvents:%#v", len(actualEvents), len(events), actualEvents, events) + } + + if strictOrder { + for idx, expectedEvent := range events { + valid, errors := validateEvents(expectedEvent, actualEvents[idx]) + if !valid { + t.Logf("(called from line %d)", line) + for _, err := range errors { + t.Errorf(err) + } + } + } + } + for _, expectedEvent := range events { + validated := false + for _, actualEvent := range actualEvents { + if validated, _ = validateEvents(expectedEvent, actualEvent); validated { + break + } + } + if !validated { + t.Fatalf("Expected: %#v but didn't find", expectedEvent) + } + } +} + +func verifyNoEvents(t *testing.T, w watch.Interface) { + select { + case e := <-w.ResultChan(): + t.Errorf("Unexpected: %#v event received, expected no events", e) + case <-time.After(time.Second): + return + } } func TestCachingDeleteEvents(t *testing.T) { @@ -1183,9 +1238,9 @@ func TestCachingDeleteEvents(t *testing.T) { cacher.watchCache.Update(pod3) cacher.watchCache.Delete(pod4) - verifyEvents(t, allEventsWatcher, allEvents) - verifyEvents(t, fooEventsWatcher, fooEvents) - verifyEvents(t, barEventsWatcher, barEvents) + verifyEvents(t, allEventsWatcher, allEvents, true) + verifyEvents(t, fooEventsWatcher, fooEvents, true) + verifyEvents(t, barEventsWatcher, barEvents, true) } func testCachingObjects(t *testing.T, watchersCount int) { @@ -1367,3 +1422,272 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) { t.Errorf("unexpected number of events received, expected: %d, got: %d", bufferSize+1, received) } } + +func TestCacherWatchSemantics(t *testing.T) { + trueVal, falseVal := true, false + makePod := func(rv uint64) *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", rv), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%d", rv), + Annotations: map[string]string{}, + }, + } + } + + scenarios := []struct { + name string + allowWatchBookmarks bool + sendInitialEvents *bool + resourceVersion string + storageResourceVersion string + + initialPods []*example.Pod + podsAfterEstablishingWatch []*example.Pod + + expectedInitialEventsInStrictOrder []watch.Event + expectedInitialEventsInRandomOrder []watch.Event + expectedEventsAfterEstablishingWatch []watch.Event + }{ + { + name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset, storageRV=102", + allowWatchBookmarks: true, + sendInitialEvents: &trueVal, + storageResourceVersion: "102", + initialPods: []*example.Pod{makePod(101)}, + podsAfterEstablishingWatch: []*example.Pod{makePod(102)}, + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}}, + expectedEventsAfterEstablishingWatch: []watch.Event{ + {Type: watch.Added, Object: makePod(102)}, + {Type: watch.Bookmark, Object: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "102", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + }, + }, + { + name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0, storageRV=105", + allowWatchBookmarks: true, + sendInitialEvents: &trueVal, + resourceVersion: "0", + storageResourceVersion: "105", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + expectedInitialEventsInRandomOrder: []watch.Event{ + {Type: watch.Added, Object: makePod(101)}, + {Type: watch.Added, Object: makePod(102)}, + }, + expectedInitialEventsInStrictOrder: []watch.Event{ + {Type: watch.Bookmark, Object: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "102", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + }, + }, + { + name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=101, storageRV=105", + allowWatchBookmarks: true, + sendInitialEvents: &trueVal, + resourceVersion: "101", + storageResourceVersion: "105", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(102)}}, + expectedInitialEventsInStrictOrder: []watch.Event{ + {Type: watch.Bookmark, Object: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "102", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + }, + }, + { + name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset, storageRV=102", + sendInitialEvents: &trueVal, + storageResourceVersion: "102", + initialPods: []*example.Pod{makePod(101)}, + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}}, + podsAfterEstablishingWatch: []*example.Pod{makePod(102)}, + expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(102)}}, + }, + { + // note we set storage's RV to some future value, mustn't be used by this scenario + name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=0, storageRV=105", + sendInitialEvents: &trueVal, + resourceVersion: "0", + storageResourceVersion: "105", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, + }, + { + // note we set storage's RV to some future value, mustn't be used by this scenario + name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=101, storageRV=105", + sendInitialEvents: &trueVal, + resourceVersion: "101", + storageResourceVersion: "105", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + // make sure we only get initial events that are > initial RV (101) + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(102)}}, + }, + { + name: "sendInitialEvents=false, RV=unset, storageRV=103", + sendInitialEvents: &falseVal, + storageResourceVersion: "103", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + podsAfterEstablishingWatch: []*example.Pod{makePod(104)}, + expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(104)}}, + }, + { + // note we set storage's RV to some future value, mustn't be used by this scenario + name: "sendInitialEvents=false, RV=0, storageRV=105", + sendInitialEvents: &falseVal, + resourceVersion: "0", + storageResourceVersion: "105", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + podsAfterEstablishingWatch: []*example.Pod{makePod(103)}, + expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(103)}}, + }, + { + // note we set storage's RV to some future value, mustn't be used by this scenario + name: "legacy, RV=0, storageRV=105", + resourceVersion: "0", + storageResourceVersion: "105", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, + }, + { + // note we set storage's RV to some future value, mustn't be used by this scenario + name: "legacy, RV=unset, storageRV=105", + storageResourceVersion: "105", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + // no events because the watch is delegated to the underlying storage + }, + } + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + // set up env + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() + storageListMetaResourceVersion := "" + backingStorage := &dummyStorage{getListFn: func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error { + podList := listObj.(*example.PodList) + podList.ListMeta = metav1.ListMeta{ResourceVersion: storageListMetaResourceVersion} + return nil + }} + + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("falied to create cacher: %v", err) + } + defer cacher.Stop() + if err := cacher.ready.wait(context.TODO()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + + // now, run a scenario + // but first let's add some initial data + for _, obj := range scenario.initialPods { + err = cacher.watchCache.Add(obj) + require.NoError(t, err, "failed to add a pod: %v") + } + // read request params + opts := storage.ListOptions{Predicate: storage.Everything} + opts.SendInitialEvents = scenario.sendInitialEvents + opts.Predicate.AllowWatchBookmarks = scenario.allowWatchBookmarks + if len(scenario.resourceVersion) > 0 { + opts.ResourceVersion = scenario.resourceVersion + } + // before starting a new watch set a storage RV to some future value + storageListMetaResourceVersion = scenario.storageResourceVersion + + w, err := cacher.Watch(context.Background(), "pods/ns", opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + + // make sure we only get initial events + verifyEvents(t, w, scenario.expectedInitialEventsInRandomOrder, false) + verifyEvents(t, w, scenario.expectedInitialEventsInStrictOrder, true) + verifyNoEvents(t, w) + // add a pod that is greater than the storage's RV when the watch was started + for _, obj := range scenario.podsAfterEstablishingWatch { + err = cacher.watchCache.Add(obj) + require.NoError(t, err, "failed to add a pod: %v") + } + verifyEvents(t, w, scenario.expectedEventsAfterEstablishingWatch, true) + verifyNoEvents(t, w) + }) + } +} + +func TestGetCurrentResourceVersionFromStorage(t *testing.T) { + // test data + newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { + server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, example.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, prefix, schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig()) + return server, storage + } + server, etcdStorage := newEtcdTestStorage(t, "") + defer server.Terminate(t) + podCacher, versioner, err := newTestCacher(etcdStorage) + if err != nil { + t.Fatalf("Couldn't create podCacher: %v", err) + } + defer podCacher.Stop() + + makePod := func(name string) *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name}, + } + } + createPod := func(obj *example.Pod) *example.Pod { + key := "pods/" + obj.Namespace + "/" + obj.Name + out := &example.Pod{} + err := etcdStorage.Create(context.TODO(), key, obj, out, 0) + require.NoError(t, err) + return out + } + getPod := func(name, ns string) *example.Pod { + key := "pods/" + ns + "/" + name + out := &example.Pod{} + err := etcdStorage.Get(context.TODO(), key, storage.GetOptions{}, out) + require.NoError(t, err) + return out + } + makeReplicaSet := func(name string) *example.ReplicaSet { + return &example.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name}, + } + } + createReplicaSet := func(obj *example.ReplicaSet) *example.ReplicaSet { + key := "replicasets/" + obj.Namespace + "/" + obj.Name + out := &example.ReplicaSet{} + err := etcdStorage.Create(context.TODO(), key, obj, out, 0) + require.NoError(t, err) + return out + } + + // create a pod and make sure its RV is equal to the one maintained by etcd + pod := createPod(makePod("pod-1")) + currentStorageRV, err := podCacher.getCurrentResourceVersionFromStorage(context.TODO()) + require.NoError(t, err) + podRV, err := versioner.ParseResourceVersion(pod.ResourceVersion) + require.NoError(t, err) + require.Equal(t, currentStorageRV, podRV, "expected the global etcd RV to be equal to pod's RV") + + // now create a replicaset (new resource) and make sure the target function returns global etcd RV + rs := createReplicaSet(makeReplicaSet("replicaset-1")) + currentStorageRV, err = podCacher.getCurrentResourceVersionFromStorage(context.TODO()) + require.NoError(t, err) + rsRV, err := versioner.ParseResourceVersion(rs.ResourceVersion) + require.NoError(t, err) + require.Equal(t, currentStorageRV, rsRV, "expected the global etcd RV to be equal to replicaset's RV") + + // ensure that the pod's RV hasn't been changed + currentPod := getPod(pod.Name, pod.Namespace) + currentPodRV, err := versioner.ParseResourceVersion(currentPod.ResourceVersion) + require.NoError(t, err) + require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed") +}