From 1785b5afec70c668642a0243f209ad1bba558280 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Wed, 25 Jun 2025 13:04:29 +0200 Subject: [PATCH] client-go/reflector: stop exposing UseWatchList (#132453) * client-go/reflector: stop exposing UseWatchList * apiserver/cacher: stop setting reflector.UseWatchList * test/integration/watchlist: fix TestReflectorWatchListFallback Kubernetes-commit: b8b3984874e930c92057589fd1a7668dbdffc117 --- tools/cache/reflector.go | 18 +++++------------- tools/cache/reflector_test.go | 4 +++- tools/cache/reflector_watchlist_test.go | 8 +++----- 3 files changed, 11 insertions(+), 19 deletions(-) diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 5a7113da..47626610 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -43,7 +43,6 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/clock" "k8s.io/utils/pointer" - "k8s.io/utils/ptr" "k8s.io/utils/trace" ) @@ -130,7 +129,7 @@ type Reflector struct { ShouldResync func() bool // MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch. MaxInternalErrorRetryDuration time.Duration - // UseWatchList if turned on instructs the reflector to open a stream to bring data from the API server. + // useWatchList if turned on instructs the reflector to open a stream to bring data from the API server. // Streaming has the primary advantage of using fewer server's resources to fetch data. // // The old behaviour establishes a LIST request which gets data in chunks. @@ -138,9 +137,7 @@ type Reflector struct { // might result in an increased memory consumption of the APIServer. // // See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details - // - // TODO(#115478): Consider making reflector.UseWatchList a private field. Since we implemented "api streaming" on the etcd storage layer it should work. - UseWatchList *bool + useWatchList bool } func (r *Reflector) Name() string { @@ -293,11 +290,7 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store R r.expectedGVK = getExpectedGVKFromObject(expectedType) } - // don't overwrite UseWatchList if already set - // because the higher layers (e.g. storage/cacher) disabled it on purpose - if r.UseWatchList == nil { - r.UseWatchList = ptr.To(clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient)) - } + r.useWatchList = clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) return r } @@ -403,8 +396,7 @@ func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error { logger.V(3).Info("Listing and watching", "type", r.typeDescription, "reflector", r.name) var err error var w watch.Interface - useWatchList := ptr.Deref(r.UseWatchList, false) - fallbackToList := !useWatchList + fallbackToList := !r.useWatchList defer func() { if w != nil { @@ -412,7 +404,7 @@ func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error { } }() - if useWatchList { + if r.useWatchList { w, err = r.watchList(ctx) if w == nil && err == nil { // stopCh was closed diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index c1c3fef0..bc3168af 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -47,6 +47,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + clientfeatures "k8s.io/client-go/features" + clientfeaturestesting "k8s.io/client-go/features/testing" "k8s.io/klog/v2/ktesting" "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" @@ -533,6 +535,7 @@ func TestReflectorListAndWatch(t *testing.T) { } for _, tc := range table { t.Run(tc.name, func(t *testing.T) { + clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, tc.useWatchList) watcherCh := make(chan *watch.FakeWatcher) var listOpts, watchOpts []metav1.ListOptions @@ -563,7 +566,6 @@ func TestReflectorListAndWatch(t *testing.T) { } s := NewFIFO(MetaNamespaceKeyFunc) r := NewReflector(lw, &v1.Pod{}, s, 0) - r.UseWatchList = ptr.To(tc.useWatchList) // Start ListAndWatch in the background. // When it returns, it will send an error or nil on the error diff --git a/tools/cache/reflector_watchlist_test.go b/tools/cache/reflector_watchlist_test.go index 4bf1817d..301edee5 100644 --- a/tools/cache/reflector_watchlist_test.go +++ b/tools/cache/reflector_watchlist_test.go @@ -36,10 +36,11 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + clientfeatures "k8s.io/client-go/features" + clientfeaturestesting "k8s.io/client-go/features/testing" "k8s.io/klog/v2/ktesting" testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/pointer" - "k8s.io/utils/ptr" ) func TestInitialEventsEndBookmarkTicker(t *testing.T) { @@ -474,6 +475,7 @@ func TestWatchList(t *testing.T) { t.Run(s.name, func(t *testing.T) { scenario := s // capture as local variable _, ctx := ktesting.NewTestContext(t) + clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, !scenario.disableUseWatchList) listWatcher, store, reflector, ctx, cancel := testData(ctx) go func() { for i, e := range scenario.watchEvents { @@ -491,9 +493,6 @@ func TestWatchList(t *testing.T) { listWatcher.closeAfterWatchRequests = scenario.closeAfterWatchRequests listWatcher.customListResponse = scenario.podList listWatcher.closeAfterListRequests = scenario.closeAfterListRequests - if scenario.disableUseWatchList { - reflector.UseWatchList = ptr.To(false) - } err := reflector.ListAndWatchWithContext(ctx) if scenario.expectedError != nil && err == nil { @@ -582,7 +581,6 @@ func testData(ctx context.Context) (*fakeListWatcher, Store, *Reflector, context }, } r := NewReflector(lw, &v1.Pod{}, s, 0) - r.UseWatchList = ptr.To(true) return lw, s, r, ctx, cancel }