diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 4841689209a..8a63bfc8a0d 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -1224,6 +1224,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS genericfeatures.RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, + genericfeatures.SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta}, + genericfeatures.ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29 genericfeatures.ServerSideFieldValidation: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29 diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index 5b486ba20d0..49894cc1c6d 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -170,6 +170,13 @@ const ( // to a chunking list request. RemainingItemCount featuregate.Feature = "RemainingItemCount" + // owner: @serathius + // beta: v1.30 + // + // Allow watch cache to create a watch on a dedicated RPC. + // This prevents watch cache from being starved by other watches. + SeparateCacheWatchRPC featuregate.Feature = "SeparateCacheWatchRPC" + // owner: @apelisse, @lavalamp // alpha: v1.14 // beta: v1.16 @@ -319,6 +326,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS RetryGenerateName: {Default: false, PreRelease: featuregate.Alpha}, + SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta}, + ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29 ServerSideFieldValidation: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29 diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index de25c2e5c74..70e27daf06e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -25,6 +25,7 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "google.golang.org/grpc/metadata" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -397,10 +398,18 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { // so that future reuse does not get a spurious timeout. <-cacher.timer.C } - progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock) + var contextMetadata metadata.MD + if utilfeature.DefaultFeatureGate.Enabled(features.SeparateCacheWatchRPC) { + // Add grpc context metadata to watch and progress notify requests done by cacher to: + // * Prevent starvation of watch opened by cacher, by moving it to separate Watch RPC than watch request that bypass cacher. + // * Ensure that progress notification requests are executed on the same Watch RPC as their watch, which is required for it to work. + contextMetadata = metadata.New(map[string]string{"source": "cache"}) + } + + progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata) watchCache := newWatchCache( config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester) - listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) + listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc, contextMetadata) reflectorName := "storage/cacher.go:" + config.ResourcePrefix reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 8a44802d86b..414307e1a0a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -29,6 +29,7 @@ import ( "time" "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -2345,3 +2346,128 @@ func TestGetBookmarkAfterResourceVersionLockedFunc(t *testing.T) { }) } } + +func TestWatchStreamSeparation(t *testing.T) { + tcs := []struct { + name string + separateCacheWatchRPC bool + useWatchCacheContextMetadata bool + expectBookmarkOnWatchCache bool + expectBookmarkOnEtcd bool + }{ + { + name: "common RPC > both get bookmarks", + separateCacheWatchRPC: false, + expectBookmarkOnEtcd: true, + expectBookmarkOnWatchCache: true, + }, + { + name: "common RPC & watch cache context > both get bookmarks", + separateCacheWatchRPC: false, + useWatchCacheContextMetadata: true, + expectBookmarkOnEtcd: true, + expectBookmarkOnWatchCache: true, + }, + { + name: "separate RPC > only etcd gets bookmarks", + separateCacheWatchRPC: true, + expectBookmarkOnEtcd: true, + expectBookmarkOnWatchCache: false, + }, + { + name: "separate RPC & watch cache context > only watch cache gets bookmarks", + separateCacheWatchRPC: true, + useWatchCacheContextMetadata: true, + expectBookmarkOnEtcd: false, + expectBookmarkOnWatchCache: true, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SeparateCacheWatchRPC, tc.separateCacheWatchRPC)() + _, cacher, _, terminate := testSetupWithEtcdServer(t) + t.Cleanup(terminate) + if err := cacher.ready.wait(context.TODO()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + + getCacherRV := func() uint64 { + cacher.watchCache.RLock() + defer cacher.watchCache.RUnlock() + return cacher.watchCache.resourceVersion + } + waitContext, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + waitForEtcdBookmark := watchAndWaitForBookmark(t, waitContext, cacher.storage) + + var out example.Pod + err := cacher.Create(context.Background(), "foo", &example.Pod{}, &out, 0) + if err != nil { + t.Fatal(err) + } + versioner := storage.APIObjectVersioner{} + var lastResourceVersion uint64 + lastResourceVersion, err = versioner.ObjectResourceVersion(&out) + if err != nil { + t.Fatal(err) + } + + var contextMetadata metadata.MD + if tc.useWatchCacheContextMetadata { + contextMetadata = cacher.watchCache.waitingUntilFresh.contextMetadata + } + // Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507 + // TODO(https://github.com/etcd-io/etcd/issues/17507): Remove sleep when etcd is upgraded to version with fix. + time.Sleep(time.Second) + err = cacher.storage.RequestWatchProgress(metadata.NewOutgoingContext(context.Background(), contextMetadata)) + if err != nil { + t.Fatal(err) + } + // Give time for bookmark to arrive + time.Sleep(time.Second) + + etcdWatchResourceVersion := waitForEtcdBookmark() + gotEtcdWatchBookmark := etcdWatchResourceVersion == lastResourceVersion + if gotEtcdWatchBookmark != tc.expectBookmarkOnEtcd { + t.Errorf("Unexpected etcd bookmark check result, rv: %d, got: %v, want: %v", etcdWatchResourceVersion, etcdWatchResourceVersion, tc.expectBookmarkOnEtcd) + } + + watchCacheResourceVersion := getCacherRV() + cacherGotBookmark := watchCacheResourceVersion == lastResourceVersion + if cacherGotBookmark != tc.expectBookmarkOnWatchCache { + t.Errorf("Unexpected watch cache bookmark check result, rv: %d, got: %v, want: %v", watchCacheResourceVersion, cacherGotBookmark, tc.expectBookmarkOnWatchCache) + } + }) + } +} + +func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) { + opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true} + opts.Predicate.AllowWatchBookmarks = true + w, err := etcdStorage.Watch(ctx, "/pods/", opts) + if err != nil { + t.Fatal(err) + } + + versioner := storage.APIObjectVersioner{} + var rv uint64 + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for event := range w.ResultChan() { + if event.Type == watch.Bookmark { + rv, err = versioner.ObjectResourceVersion(event.Object) + break + } + } + }() + return func() (resourceVersion uint64) { + defer w.Stop() + wg.Wait() + if err != nil { + t.Fatal(err) + } + return rv + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go index 1252e5e3495..2817a93dd0c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go @@ -19,6 +19,8 @@ package cacher import ( "context" + "google.golang.org/grpc/metadata" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -30,17 +32,19 @@ import ( // listerWatcher opaques storage.Interface to expose cache.ListerWatcher. type listerWatcher struct { - storage storage.Interface - resourcePrefix string - newListFunc func() runtime.Object + storage storage.Interface + resourcePrefix string + newListFunc func() runtime.Object + contextMetadata metadata.MD } // NewListerWatcher returns a storage.Interface backed ListerWatcher. -func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher { +func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object, contextMetadata metadata.MD) cache.ListerWatcher { return &listerWatcher{ - storage: storage, - resourcePrefix: resourcePrefix, - newListFunc: newListFunc, + storage: storage, + resourcePrefix: resourcePrefix, + newListFunc: newListFunc, + contextMetadata: contextMetadata, } } @@ -59,7 +63,11 @@ func (lw *listerWatcher) List(options metav1.ListOptions) (runtime.Object, error Predicate: pred, Recursive: true, } - if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil { + ctx := context.Background() + if lw.contextMetadata != nil { + ctx = metadata.NewOutgoingContext(ctx, lw.contextMetadata) + } + if err := lw.storage.GetList(ctx, lw.resourcePrefix, storageOpts, list); err != nil { return nil, err } return list, nil @@ -73,5 +81,9 @@ func (lw *listerWatcher) Watch(options metav1.ListOptions) (watch.Interface, err Recursive: true, ProgressNotify: true, } - return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts) + ctx := context.Background() + if lw.contextMetadata != nil { + ctx = metadata.NewOutgoingContext(ctx, lw.contextMetadata) + } + return lw.storage.Watch(ctx, lw.resourcePrefix, opts) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go index 6c89ee3038e..a5a279cb699 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go @@ -44,7 +44,7 @@ func TestCacherListerWatcher(t *testing.T) { } } - lw := NewListerWatcher(store, prefix, fn) + lw := NewListerWatcher(store, prefix, fn, nil) obj, err := lw.List(metav1.ListOptions{}) if err != nil { @@ -80,7 +80,7 @@ func TestCacherListerWatcherPagination(t *testing.T) { } } - lw := NewListerWatcher(store, prefix, fn) + lw := NewListerWatcher(store, prefix, fn, nil) obj1, err := lw.List(metav1.ListOptions{Limit: 2}) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 752c607a083..c9f3a07995a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -125,7 +125,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { wc := &testWatchCache{} wc.bookmarkRevision = make(chan int64, 1) wc.stopCh = make(chan struct{}) - pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}) + pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil) go pr.Run(wc.stopCh) wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}, pr) // To preserve behavior of tests that assume a given capacity, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go index f44ca9325b8..13f50bc187d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "google.golang.org/grpc/metadata" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -34,10 +36,11 @@ const ( progressRequestPeriod = 100 * time.Millisecond ) -func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory) *conditionalProgressRequester { +func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *conditionalProgressRequester { pr := &conditionalProgressRequester{ clock: clock, requestWatchProgress: requestWatchProgress, + contextMetadata: contextMetadata, } pr.cond = sync.NewCond(pr.mux.RLocker()) return pr @@ -54,6 +57,7 @@ type TickerFactory interface { type conditionalProgressRequester struct { clock TickerFactory requestWatchProgress WatchProgressRequester + contextMetadata metadata.MD mux sync.RWMutex cond *sync.Cond @@ -63,6 +67,9 @@ type conditionalProgressRequester struct { func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { ctx := wait.ContextForChannel(stopCh) + if pr.contextMetadata != nil { + ctx = metadata.NewOutgoingContext(ctx, pr.contextMetadata) + } go func() { defer utilruntime.HandleCrash() <-stopCh diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go index e004ed35125..57c1f23d926 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go @@ -115,7 +115,7 @@ func TestConditionalProgressRequester(t *testing.T) { func newTestConditionalProgressRequester(clock clock.WithTicker) *testConditionalProgressRequester { pr := &testConditionalProgressRequester{} - pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock) + pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock, nil) return pr }