mirror of
https://github.com/kubernetes/client-go.git
synced 2025-08-31 14:44:50 +00:00
client-go/reflector: make UseWatchList a pointer
until #115478(use streaming against the etcd storage) is resolved the cacher need a way to disable the streaming. Kubernetes-commit: 41e706600aea7468f486150d951d3b8948ce89d5
This commit is contained in:
committed by
Kubernetes Publisher
parent
ffe7bf60eb
commit
202c415847
18
tools/cache/reflector.go
vendored
18
tools/cache/reflector.go
vendored
@@ -43,6 +43,7 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/utils/clock"
|
||||
"k8s.io/utils/pointer"
|
||||
"k8s.io/utils/ptr"
|
||||
"k8s.io/utils/trace"
|
||||
)
|
||||
|
||||
@@ -107,7 +108,9 @@ type Reflector struct {
|
||||
// might result in an increased memory consumption of the APIServer.
|
||||
//
|
||||
// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
|
||||
UseWatchList bool
|
||||
//
|
||||
// TODO(#115478): Consider making reflector.UseWatchList a private field. Since we implemented "api streaming" on the etcd storage layer it should work.
|
||||
UseWatchList *bool
|
||||
}
|
||||
|
||||
// ResourceVersionUpdater is an interface that allows store implementation to
|
||||
@@ -237,8 +240,12 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S
|
||||
r.expectedGVK = getExpectedGVKFromObject(expectedType)
|
||||
}
|
||||
|
||||
if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 {
|
||||
r.UseWatchList = true
|
||||
// don't overwrite UseWatchList if already set
|
||||
// because the higher layers (e.g. storage/cacher) disabled it on purpose
|
||||
if r.UseWatchList == nil {
|
||||
if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 {
|
||||
r.UseWatchList = ptr.To(true)
|
||||
}
|
||||
}
|
||||
|
||||
return r
|
||||
@@ -325,9 +332,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
|
||||
var err error
|
||||
var w watch.Interface
|
||||
fallbackToList := !r.UseWatchList
|
||||
useWatchList := ptr.Deref(r.UseWatchList, false)
|
||||
fallbackToList := !useWatchList
|
||||
|
||||
if r.UseWatchList {
|
||||
if useWatchList {
|
||||
w, err = r.watchList(stopCh)
|
||||
if w == nil && err == nil {
|
||||
// stopCh was closed
|
||||
|
5
tools/cache/reflector_watchlist_test.go
vendored
5
tools/cache/reflector_watchlist_test.go
vendored
@@ -33,6 +33,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/utils/pointer"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
func TestWatchList(t *testing.T) {
|
||||
@@ -415,7 +416,7 @@ func TestWatchList(t *testing.T) {
|
||||
listWatcher.customListResponse = scenario.podList
|
||||
listWatcher.closeAfterListRequests = scenario.closeAfterListRequests
|
||||
if scenario.disableUseWatchList {
|
||||
reflector.UseWatchList = false
|
||||
reflector.UseWatchList = ptr.To(false)
|
||||
}
|
||||
|
||||
err := reflector.ListAndWatch(stopCh)
|
||||
@@ -505,7 +506,7 @@ func testData() (*fakeListWatcher, Store, *Reflector, chan struct{}) {
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||
r.UseWatchList = true
|
||||
r.UseWatchList = ptr.To(true)
|
||||
|
||||
return lw, s, r, stopCh
|
||||
}
|
||||
|
Reference in New Issue
Block a user