Merge pull request #125667 from p0lyn0mial/upstream-watchlist-off-when-progress-notification-disabled

cacher: returns an error when watch list was requested and storage.RequestWatchProgress is disabled
This commit is contained in:
Kubernetes Prow Robot 2024-06-25 14:18:26 -07:00 committed by GitHub
commit c2fc33869f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -586,6 +586,12 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// watchers on our watcher having a processing hiccup
chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported)
// client-go is going to fall back to a standard LIST on any error
// returned for watch-list requests
if isListWatchRequest(opts) && !etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) {
return newErrWatcher(fmt.Errorf("a watch stream was requested by the client but the required storage feature %s is disabled", storage.RequestWatchProgress)), nil
}
// Determine the ResourceVersion to which the watch cache must be synchronized
requiredResourceVersion, err := c.getWatchCacheResourceVersion(ctx, requestedWatchRV, opts)
if err != nil {
@ -1339,7 +1345,7 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
//
// The returned function must be called under the watchCache lock.
func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(parsedResourceVersion, requiredResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
if opts.SendInitialEvents == nil || !*opts.SendInitialEvents || !opts.Predicate.AllowWatchBookmarks {
if !isListWatchRequest(opts) {
return func() uint64 { return 0 }, nil
}
@ -1354,6 +1360,10 @@ func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(parsedResourceVersion
}
}
func isListWatchRequest(opts storage.ListOptions) bool {
return opts.SendInitialEvents != nil && *opts.SendInitialEvents && opts.Predicate.AllowWatchBookmarks
}
// getWatchCacheResourceVersion returns a ResourceVersion to which the watch cache must be synchronized to
//
// Depending on the input parameters, the semantics of the returned ResourceVersion are: