diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 3778a83c0fc..079d197a2cc 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -44,9 +44,9 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" "k8s.io/component-base/tracing" - "k8s.io/klog/v2" "k8s.io/utils/clock" + "k8s.io/utils/ptr" ) var ( @@ -413,7 +413,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { reflector.MaxInternalErrorRetryDuration = time.Second * 30 // since the watch-list is provided by the watch cache instruct // the reflector to issue a regular LIST against the store - reflector.UseWatchList = false + reflector.UseWatchList = ptr.To(false) cacher.watchCache = watchCache cacher.reflector = reflector diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index c1ea13de574..f733e244ccc 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -43,6 +43,7 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/clock" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" "k8s.io/utils/trace" ) @@ -107,7 +108,9 @@ type Reflector struct { // might result in an increased memory consumption of the APIServer. // // See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details - UseWatchList bool + // + // TODO(#115478): Consider making reflector.UseWatchList a private field. Since we implemented "api streaming" on the etcd storage layer it should work. + UseWatchList *bool } // ResourceVersionUpdater is an interface that allows store implementation to @@ -237,8 +240,12 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S r.expectedGVK = getExpectedGVKFromObject(expectedType) } - if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 { - r.UseWatchList = true + // don't overwrite UseWatchList if already set + // because the higher layers (e.g. storage/cacher) disabled it on purpose + if r.UseWatchList == nil { + if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 { + r.UseWatchList = ptr.To(true) + } } return r @@ -325,9 +332,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name) var err error var w watch.Interface - fallbackToList := !r.UseWatchList + useWatchList := ptr.Deref(r.UseWatchList, false) + fallbackToList := !useWatchList - if r.UseWatchList { + if useWatchList { w, err = r.watchList(stopCh) if w == nil && err == nil { // stopCh was closed diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_watchlist_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_watchlist_test.go index c43db073c5a..76eacb5a515 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_watchlist_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_watchlist_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" ) func TestWatchList(t *testing.T) { @@ -415,7 +416,7 @@ func TestWatchList(t *testing.T) { listWatcher.customListResponse = scenario.podList listWatcher.closeAfterListRequests = scenario.closeAfterListRequests if scenario.disableUseWatchList { - reflector.UseWatchList = false + reflector.UseWatchList = ptr.To(false) } err := reflector.ListAndWatch(stopCh) @@ -505,7 +506,7 @@ func testData() (*fakeListWatcher, Store, *Reflector, chan struct{}) { }, } r := NewReflector(lw, &v1.Pod{}, s, 0) - r.UseWatchList = true + r.UseWatchList = ptr.To(true) return lw, s, r, stopCh } diff --git a/test/integration/apimachinery/watchlist_test.go b/test/integration/apimachinery/watchlist_test.go index 127714d0c9a..77549f05df0 100644 --- a/test/integration/apimachinery/watchlist_test.go +++ b/test/integration/apimachinery/watchlist_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/tools/cache" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" + "k8s.io/utils/ptr" ) func TestReflectorWatchListFallback(t *testing.T) { @@ -67,7 +68,7 @@ func TestReflectorWatchListFallback(t *testing.T) { lw := &wrappedListWatch{&cache.ListWatch{}} lw.SetClient(ctx, clientSet, ns) target := cache.NewReflector(lw, &v1.Secret{}, store, time.Duration(0)) - target.UseWatchList = true + target.UseWatchList = ptr.To(true) t.Log("Waiting until the secret reflector synchronises to the store (call to the Replace method)") reflectorCtx, reflectorCtxCancel := context.WithCancel(context.Background())