From 31d404b182d2985ce0d3c43f75d80c29a708beda Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Tue, 27 Feb 2024 11:25:42 +0100 Subject: [PATCH] Prevent watch cache starvation, by moving its watch to separate RPC and add a SeparateCacheWatchRPC feature flag to disable this behavior --- pkg/features/kube_features.go | 2 ++ .../apiserver/pkg/features/kube_features.go | 9 ++++++ .../apiserver/pkg/storage/cacher/cacher.go | 13 ++++++-- .../pkg/storage/cacher/lister_watcher.go | 30 +++++++++++++------ .../pkg/storage/cacher/lister_watcher_test.go | 4 +-- .../pkg/storage/cacher/watch_cache_test.go | 2 +- .../pkg/storage/cacher/watch_progress.go | 9 +++++- .../pkg/storage/cacher/watch_progress_test.go | 2 +- 8 files changed, 55 insertions(+), 16 deletions(-) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 94ea3460aa9..11dd299bf73 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -1213,6 +1213,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS genericfeatures.RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, + genericfeatures.SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta}, + genericfeatures.ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29 genericfeatures.ServerSideFieldValidation: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29 diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index 5b486ba20d0..49894cc1c6d 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -170,6 +170,13 @@ const ( // to a chunking list request. RemainingItemCount featuregate.Feature = "RemainingItemCount" + // owner: @serathius + // beta: v1.30 + // + // Allow watch cache to create a watch on a dedicated RPC. + // This prevents watch cache from being starved by other watches. + SeparateCacheWatchRPC featuregate.Feature = "SeparateCacheWatchRPC" + // owner: @apelisse, @lavalamp // alpha: v1.14 // beta: v1.16 @@ -319,6 +326,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS RetryGenerateName: {Default: false, PreRelease: featuregate.Alpha}, + SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta}, + ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29 ServerSideFieldValidation: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29 diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index de25c2e5c74..70e27daf06e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -25,6 +25,7 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "google.golang.org/grpc/metadata" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -397,10 +398,18 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { // so that future reuse does not get a spurious timeout. <-cacher.timer.C } - progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock) + var contextMetadata metadata.MD + if utilfeature.DefaultFeatureGate.Enabled(features.SeparateCacheWatchRPC) { + // Add grpc context metadata to watch and progress notify requests done by cacher to: + // * Prevent starvation of watch opened by cacher, by moving it to separate Watch RPC than watch request that bypass cacher. + // * Ensure that progress notification requests are executed on the same Watch RPC as their watch, which is required for it to work. + contextMetadata = metadata.New(map[string]string{"source": "cache"}) + } + + progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata) watchCache := newWatchCache( config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester) - listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) + listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc, contextMetadata) reflectorName := "storage/cacher.go:" + config.ResourcePrefix reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go index 1252e5e3495..2817a93dd0c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go @@ -19,6 +19,8 @@ package cacher import ( "context" + "google.golang.org/grpc/metadata" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -30,17 +32,19 @@ import ( // listerWatcher opaques storage.Interface to expose cache.ListerWatcher. type listerWatcher struct { - storage storage.Interface - resourcePrefix string - newListFunc func() runtime.Object + storage storage.Interface + resourcePrefix string + newListFunc func() runtime.Object + contextMetadata metadata.MD } // NewListerWatcher returns a storage.Interface backed ListerWatcher. -func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher { +func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object, contextMetadata metadata.MD) cache.ListerWatcher { return &listerWatcher{ - storage: storage, - resourcePrefix: resourcePrefix, - newListFunc: newListFunc, + storage: storage, + resourcePrefix: resourcePrefix, + newListFunc: newListFunc, + contextMetadata: contextMetadata, } } @@ -59,7 +63,11 @@ func (lw *listerWatcher) List(options metav1.ListOptions) (runtime.Object, error Predicate: pred, Recursive: true, } - if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil { + ctx := context.Background() + if lw.contextMetadata != nil { + ctx = metadata.NewOutgoingContext(ctx, lw.contextMetadata) + } + if err := lw.storage.GetList(ctx, lw.resourcePrefix, storageOpts, list); err != nil { return nil, err } return list, nil @@ -73,5 +81,9 @@ func (lw *listerWatcher) Watch(options metav1.ListOptions) (watch.Interface, err Recursive: true, ProgressNotify: true, } - return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts) + ctx := context.Background() + if lw.contextMetadata != nil { + ctx = metadata.NewOutgoingContext(ctx, lw.contextMetadata) + } + return lw.storage.Watch(ctx, lw.resourcePrefix, opts) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go index 6c89ee3038e..a5a279cb699 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go @@ -44,7 +44,7 @@ func TestCacherListerWatcher(t *testing.T) { } } - lw := NewListerWatcher(store, prefix, fn) + lw := NewListerWatcher(store, prefix, fn, nil) obj, err := lw.List(metav1.ListOptions{}) if err != nil { @@ -80,7 +80,7 @@ func TestCacherListerWatcherPagination(t *testing.T) { } } - lw := NewListerWatcher(store, prefix, fn) + lw := NewListerWatcher(store, prefix, fn, nil) obj1, err := lw.List(metav1.ListOptions{Limit: 2}) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 8e37e0cf83e..8e335ec1b5b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -122,7 +122,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { wc := &testWatchCache{} wc.bookmarkRevision = make(chan int64, 1) wc.stopCh = make(chan struct{}) - pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}) + pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil) go pr.Run(wc.stopCh) wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}, pr) // To preserve behavior of tests that assume a given capacity, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go index f44ca9325b8..13f50bc187d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "google.golang.org/grpc/metadata" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -34,10 +36,11 @@ const ( progressRequestPeriod = 100 * time.Millisecond ) -func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory) *conditionalProgressRequester { +func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *conditionalProgressRequester { pr := &conditionalProgressRequester{ clock: clock, requestWatchProgress: requestWatchProgress, + contextMetadata: contextMetadata, } pr.cond = sync.NewCond(pr.mux.RLocker()) return pr @@ -54,6 +57,7 @@ type TickerFactory interface { type conditionalProgressRequester struct { clock TickerFactory requestWatchProgress WatchProgressRequester + contextMetadata metadata.MD mux sync.RWMutex cond *sync.Cond @@ -63,6 +67,9 @@ type conditionalProgressRequester struct { func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { ctx := wait.ContextForChannel(stopCh) + if pr.contextMetadata != nil { + ctx = metadata.NewOutgoingContext(ctx, pr.contextMetadata) + } go func() { defer utilruntime.HandleCrash() <-stopCh diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go index e004ed35125..57c1f23d926 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go @@ -115,7 +115,7 @@ func TestConditionalProgressRequester(t *testing.T) { func newTestConditionalProgressRequester(clock clock.WithTicker) *testConditionalProgressRequester { pr := &testConditionalProgressRequester{} - pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock) + pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock, nil) return pr }