From 202c415847ae9058d3138ac9001ac0b32038ef2e Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Fri, 19 Jan 2024 13:48:29 +0100 Subject: [PATCH] client-go/reflector: make UseWatchList a pointer until #115478(use streaming against the etcd storage) is resolved the cacher need a way to disable the streaming. Kubernetes-commit: 41e706600aea7468f486150d951d3b8948ce89d5 --- tools/cache/reflector.go | 18 +++++++++++++----- tools/cache/reflector_watchlist_test.go | 5 +++-- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index c1ea13de..f733e244 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -43,6 +43,7 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/clock" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" "k8s.io/utils/trace" ) @@ -107,7 +108,9 @@ type Reflector struct { // might result in an increased memory consumption of the APIServer. // // See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details - UseWatchList bool + // + // TODO(#115478): Consider making reflector.UseWatchList a private field. Since we implemented "api streaming" on the etcd storage layer it should work. + UseWatchList *bool } // ResourceVersionUpdater is an interface that allows store implementation to @@ -237,8 +240,12 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S r.expectedGVK = getExpectedGVKFromObject(expectedType) } - if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 { - r.UseWatchList = true + // don't overwrite UseWatchList if already set + // because the higher layers (e.g. storage/cacher) disabled it on purpose + if r.UseWatchList == nil { + if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 { + r.UseWatchList = ptr.To(true) + } } return r @@ -325,9 +332,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name) var err error var w watch.Interface - fallbackToList := !r.UseWatchList + useWatchList := ptr.Deref(r.UseWatchList, false) + fallbackToList := !useWatchList - if r.UseWatchList { + if useWatchList { w, err = r.watchList(stopCh) if w == nil && err == nil { // stopCh was closed diff --git a/tools/cache/reflector_watchlist_test.go b/tools/cache/reflector_watchlist_test.go index c43db073..76eacb5a 100644 --- a/tools/cache/reflector_watchlist_test.go +++ b/tools/cache/reflector_watchlist_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" ) func TestWatchList(t *testing.T) { @@ -415,7 +416,7 @@ func TestWatchList(t *testing.T) { listWatcher.customListResponse = scenario.podList listWatcher.closeAfterListRequests = scenario.closeAfterListRequests if scenario.disableUseWatchList { - reflector.UseWatchList = false + reflector.UseWatchList = ptr.To(false) } err := reflector.ListAndWatch(stopCh) @@ -505,7 +506,7 @@ func testData() (*fakeListWatcher, Store, *Reflector, chan struct{}) { }, } r := NewReflector(lw, &v1.Pod{}, s, 0) - r.UseWatchList = true + r.UseWatchList = ptr.To(true) return lw, s, r, stopCh }