Merge pull request #122873 from p0lyn0mial/upstream-reflector-usewatchlist-pointer

client-go/reflector: make UseWatchList a pointer
This commit is contained in:
Kubernetes Prow Robot 2024-01-22 18:20:58 +01:00 committed by GitHub
commit 445869a59b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 20 additions and 10 deletions

View File

@ -44,9 +44,9 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing" "k8s.io/component-base/tracing"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/utils/clock" "k8s.io/utils/clock"
"k8s.io/utils/ptr"
) )
var ( var (
@ -413,7 +413,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
reflector.MaxInternalErrorRetryDuration = time.Second * 30 reflector.MaxInternalErrorRetryDuration = time.Second * 30
// since the watch-list is provided by the watch cache instruct // since the watch-list is provided by the watch cache instruct
// the reflector to issue a regular LIST against the store // the reflector to issue a regular LIST against the store
reflector.UseWatchList = false reflector.UseWatchList = ptr.To(false)
cacher.watchCache = watchCache cacher.watchCache = watchCache
cacher.reflector = reflector cacher.reflector = reflector

View File

@ -43,6 +43,7 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/utils/clock" "k8s.io/utils/clock"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
"k8s.io/utils/ptr"
"k8s.io/utils/trace" "k8s.io/utils/trace"
) )
@ -107,7 +108,9 @@ type Reflector struct {
// might result in an increased memory consumption of the APIServer. // might result in an increased memory consumption of the APIServer.
// //
// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details // See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
UseWatchList bool //
// TODO(#115478): Consider making reflector.UseWatchList a private field. Since we implemented "api streaming" on the etcd storage layer it should work.
UseWatchList *bool
} }
// ResourceVersionUpdater is an interface that allows store implementation to // ResourceVersionUpdater is an interface that allows store implementation to
@ -237,8 +240,12 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S
r.expectedGVK = getExpectedGVKFromObject(expectedType) r.expectedGVK = getExpectedGVKFromObject(expectedType)
} }
if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 { // don't overwrite UseWatchList if already set
r.UseWatchList = true // because the higher layers (e.g. storage/cacher) disabled it on purpose
if r.UseWatchList == nil {
if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 {
r.UseWatchList = ptr.To(true)
}
} }
return r return r
@ -325,9 +332,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name) klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
var err error var err error
var w watch.Interface var w watch.Interface
fallbackToList := !r.UseWatchList useWatchList := ptr.Deref(r.UseWatchList, false)
fallbackToList := !useWatchList
if r.UseWatchList { if useWatchList {
w, err = r.watchList(stopCh) w, err = r.watchList(stopCh)
if w == nil && err == nil { if w == nil && err == nil {
// stopCh was closed // stopCh was closed

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
"k8s.io/utils/ptr"
) )
func TestWatchList(t *testing.T) { func TestWatchList(t *testing.T) {
@ -415,7 +416,7 @@ func TestWatchList(t *testing.T) {
listWatcher.customListResponse = scenario.podList listWatcher.customListResponse = scenario.podList
listWatcher.closeAfterListRequests = scenario.closeAfterListRequests listWatcher.closeAfterListRequests = scenario.closeAfterListRequests
if scenario.disableUseWatchList { if scenario.disableUseWatchList {
reflector.UseWatchList = false reflector.UseWatchList = ptr.To(false)
} }
err := reflector.ListAndWatch(stopCh) err := reflector.ListAndWatch(stopCh)
@ -505,7 +506,7 @@ func testData() (*fakeListWatcher, Store, *Reflector, chan struct{}) {
}, },
} }
r := NewReflector(lw, &v1.Pod{}, s, 0) r := NewReflector(lw, &v1.Pod{}, s, 0)
r.UseWatchList = true r.UseWatchList = ptr.To(true)
return lw, s, r, stopCh return lw, s, r, stopCh
} }

View File

@ -35,6 +35,7 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
"k8s.io/utils/ptr"
) )
func TestReflectorWatchListFallback(t *testing.T) { func TestReflectorWatchListFallback(t *testing.T) {
@ -67,7 +68,7 @@ func TestReflectorWatchListFallback(t *testing.T) {
lw := &wrappedListWatch{&cache.ListWatch{}} lw := &wrappedListWatch{&cache.ListWatch{}}
lw.SetClient(ctx, clientSet, ns) lw.SetClient(ctx, clientSet, ns)
target := cache.NewReflector(lw, &v1.Secret{}, store, time.Duration(0)) target := cache.NewReflector(lw, &v1.Secret{}, store, time.Duration(0))
target.UseWatchList = true target.UseWatchList = ptr.To(true)
t.Log("Waiting until the secret reflector synchronises to the store (call to the Replace method)") t.Log("Waiting until the secret reflector synchronises to the store (call to the Replace method)")
reflectorCtx, reflectorCtxCancel := context.WithCancel(context.Background()) reflectorCtx, reflectorCtxCancel := context.WithCancel(context.Background())