mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-14 22:33:34 +00:00
Merge pull request #123532 from serathius/separate-rpc
Move cacher watch to separate rpc preventing starvation
This commit is contained in:
commit
5b6d8a4293
@ -1224,6 +1224,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
|||||||
|
|
||||||
genericfeatures.RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
|
genericfeatures.RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
|
||||||
|
|
||||||
|
genericfeatures.SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},
|
||||||
|
|
||||||
genericfeatures.ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
|
genericfeatures.ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
|
||||||
|
|
||||||
genericfeatures.ServerSideFieldValidation: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
|
genericfeatures.ServerSideFieldValidation: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
|
||||||
|
@ -170,6 +170,13 @@ const (
|
|||||||
// to a chunking list request.
|
// to a chunking list request.
|
||||||
RemainingItemCount featuregate.Feature = "RemainingItemCount"
|
RemainingItemCount featuregate.Feature = "RemainingItemCount"
|
||||||
|
|
||||||
|
// owner: @serathius
|
||||||
|
// beta: v1.30
|
||||||
|
//
|
||||||
|
// Allow watch cache to create a watch on a dedicated RPC.
|
||||||
|
// This prevents watch cache from being starved by other watches.
|
||||||
|
SeparateCacheWatchRPC featuregate.Feature = "SeparateCacheWatchRPC"
|
||||||
|
|
||||||
// owner: @apelisse, @lavalamp
|
// owner: @apelisse, @lavalamp
|
||||||
// alpha: v1.14
|
// alpha: v1.14
|
||||||
// beta: v1.16
|
// beta: v1.16
|
||||||
@ -319,6 +326,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
|||||||
|
|
||||||
RetryGenerateName: {Default: false, PreRelease: featuregate.Alpha},
|
RetryGenerateName: {Default: false, PreRelease: featuregate.Alpha},
|
||||||
|
|
||||||
|
SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},
|
||||||
|
|
||||||
ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
|
ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
|
||||||
|
|
||||||
ServerSideFieldValidation: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
|
ServerSideFieldValidation: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
@ -397,10 +398,18 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
|
|||||||
// so that future reuse does not get a spurious timeout.
|
// so that future reuse does not get a spurious timeout.
|
||||||
<-cacher.timer.C
|
<-cacher.timer.C
|
||||||
}
|
}
|
||||||
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock)
|
var contextMetadata metadata.MD
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.SeparateCacheWatchRPC) {
|
||||||
|
// Add grpc context metadata to watch and progress notify requests done by cacher to:
|
||||||
|
// * Prevent starvation of watch opened by cacher, by moving it to separate Watch RPC than watch request that bypass cacher.
|
||||||
|
// * Ensure that progress notification requests are executed on the same Watch RPC as their watch, which is required for it to work.
|
||||||
|
contextMetadata = metadata.New(map[string]string{"source": "cache"})
|
||||||
|
}
|
||||||
|
|
||||||
|
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata)
|
||||||
watchCache := newWatchCache(
|
watchCache := newWatchCache(
|
||||||
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester)
|
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester)
|
||||||
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc, contextMetadata)
|
||||||
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
||||||
|
|
||||||
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
|
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
@ -2345,3 +2346,128 @@ func TestGetBookmarkAfterResourceVersionLockedFunc(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchStreamSeparation(t *testing.T) {
|
||||||
|
tcs := []struct {
|
||||||
|
name string
|
||||||
|
separateCacheWatchRPC bool
|
||||||
|
useWatchCacheContextMetadata bool
|
||||||
|
expectBookmarkOnWatchCache bool
|
||||||
|
expectBookmarkOnEtcd bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "common RPC > both get bookmarks",
|
||||||
|
separateCacheWatchRPC: false,
|
||||||
|
expectBookmarkOnEtcd: true,
|
||||||
|
expectBookmarkOnWatchCache: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "common RPC & watch cache context > both get bookmarks",
|
||||||
|
separateCacheWatchRPC: false,
|
||||||
|
useWatchCacheContextMetadata: true,
|
||||||
|
expectBookmarkOnEtcd: true,
|
||||||
|
expectBookmarkOnWatchCache: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "separate RPC > only etcd gets bookmarks",
|
||||||
|
separateCacheWatchRPC: true,
|
||||||
|
expectBookmarkOnEtcd: true,
|
||||||
|
expectBookmarkOnWatchCache: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "separate RPC & watch cache context > only watch cache gets bookmarks",
|
||||||
|
separateCacheWatchRPC: true,
|
||||||
|
useWatchCacheContextMetadata: true,
|
||||||
|
expectBookmarkOnEtcd: false,
|
||||||
|
expectBookmarkOnWatchCache: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SeparateCacheWatchRPC, tc.separateCacheWatchRPC)()
|
||||||
|
_, cacher, _, terminate := testSetupWithEtcdServer(t)
|
||||||
|
t.Cleanup(terminate)
|
||||||
|
if err := cacher.ready.wait(context.TODO()); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
|
getCacherRV := func() uint64 {
|
||||||
|
cacher.watchCache.RLock()
|
||||||
|
defer cacher.watchCache.RUnlock()
|
||||||
|
return cacher.watchCache.resourceVersion
|
||||||
|
}
|
||||||
|
waitContext, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
waitForEtcdBookmark := watchAndWaitForBookmark(t, waitContext, cacher.storage)
|
||||||
|
|
||||||
|
var out example.Pod
|
||||||
|
err := cacher.Create(context.Background(), "foo", &example.Pod{}, &out, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
versioner := storage.APIObjectVersioner{}
|
||||||
|
var lastResourceVersion uint64
|
||||||
|
lastResourceVersion, err = versioner.ObjectResourceVersion(&out)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var contextMetadata metadata.MD
|
||||||
|
if tc.useWatchCacheContextMetadata {
|
||||||
|
contextMetadata = cacher.watchCache.waitingUntilFresh.contextMetadata
|
||||||
|
}
|
||||||
|
// Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507
|
||||||
|
// TODO(https://github.com/etcd-io/etcd/issues/17507): Remove sleep when etcd is upgraded to version with fix.
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
err = cacher.storage.RequestWatchProgress(metadata.NewOutgoingContext(context.Background(), contextMetadata))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// Give time for bookmark to arrive
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
etcdWatchResourceVersion := waitForEtcdBookmark()
|
||||||
|
gotEtcdWatchBookmark := etcdWatchResourceVersion == lastResourceVersion
|
||||||
|
if gotEtcdWatchBookmark != tc.expectBookmarkOnEtcd {
|
||||||
|
t.Errorf("Unexpected etcd bookmark check result, rv: %d, got: %v, want: %v", etcdWatchResourceVersion, etcdWatchResourceVersion, tc.expectBookmarkOnEtcd)
|
||||||
|
}
|
||||||
|
|
||||||
|
watchCacheResourceVersion := getCacherRV()
|
||||||
|
cacherGotBookmark := watchCacheResourceVersion == lastResourceVersion
|
||||||
|
if cacherGotBookmark != tc.expectBookmarkOnWatchCache {
|
||||||
|
t.Errorf("Unexpected watch cache bookmark check result, rv: %d, got: %v, want: %v", watchCacheResourceVersion, cacherGotBookmark, tc.expectBookmarkOnWatchCache)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) {
|
||||||
|
opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true}
|
||||||
|
opts.Predicate.AllowWatchBookmarks = true
|
||||||
|
w, err := etcdStorage.Watch(ctx, "/pods/", opts)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
versioner := storage.APIObjectVersioner{}
|
||||||
|
var rv uint64
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for event := range w.ResultChan() {
|
||||||
|
if event.Type == watch.Bookmark {
|
||||||
|
rv, err = versioner.ObjectResourceVersion(event.Object)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return func() (resourceVersion uint64) {
|
||||||
|
defer w.Stop()
|
||||||
|
wg.Wait()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return rv
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -19,6 +19,8 @@ package cacher
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
@ -30,17 +32,19 @@ import (
|
|||||||
|
|
||||||
// listerWatcher opaques storage.Interface to expose cache.ListerWatcher.
|
// listerWatcher opaques storage.Interface to expose cache.ListerWatcher.
|
||||||
type listerWatcher struct {
|
type listerWatcher struct {
|
||||||
storage storage.Interface
|
storage storage.Interface
|
||||||
resourcePrefix string
|
resourcePrefix string
|
||||||
newListFunc func() runtime.Object
|
newListFunc func() runtime.Object
|
||||||
|
contextMetadata metadata.MD
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewListerWatcher returns a storage.Interface backed ListerWatcher.
|
// NewListerWatcher returns a storage.Interface backed ListerWatcher.
|
||||||
func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
|
func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object, contextMetadata metadata.MD) cache.ListerWatcher {
|
||||||
return &listerWatcher{
|
return &listerWatcher{
|
||||||
storage: storage,
|
storage: storage,
|
||||||
resourcePrefix: resourcePrefix,
|
resourcePrefix: resourcePrefix,
|
||||||
newListFunc: newListFunc,
|
newListFunc: newListFunc,
|
||||||
|
contextMetadata: contextMetadata,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -59,7 +63,11 @@ func (lw *listerWatcher) List(options metav1.ListOptions) (runtime.Object, error
|
|||||||
Predicate: pred,
|
Predicate: pred,
|
||||||
Recursive: true,
|
Recursive: true,
|
||||||
}
|
}
|
||||||
if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil {
|
ctx := context.Background()
|
||||||
|
if lw.contextMetadata != nil {
|
||||||
|
ctx = metadata.NewOutgoingContext(ctx, lw.contextMetadata)
|
||||||
|
}
|
||||||
|
if err := lw.storage.GetList(ctx, lw.resourcePrefix, storageOpts, list); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return list, nil
|
return list, nil
|
||||||
@ -73,5 +81,9 @@ func (lw *listerWatcher) Watch(options metav1.ListOptions) (watch.Interface, err
|
|||||||
Recursive: true,
|
Recursive: true,
|
||||||
ProgressNotify: true,
|
ProgressNotify: true,
|
||||||
}
|
}
|
||||||
return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts)
|
ctx := context.Background()
|
||||||
|
if lw.contextMetadata != nil {
|
||||||
|
ctx = metadata.NewOutgoingContext(ctx, lw.contextMetadata)
|
||||||
|
}
|
||||||
|
return lw.storage.Watch(ctx, lw.resourcePrefix, opts)
|
||||||
}
|
}
|
||||||
|
@ -44,7 +44,7 @@ func TestCacherListerWatcher(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lw := NewListerWatcher(store, prefix, fn)
|
lw := NewListerWatcher(store, prefix, fn, nil)
|
||||||
|
|
||||||
obj, err := lw.List(metav1.ListOptions{})
|
obj, err := lw.List(metav1.ListOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -80,7 +80,7 @@ func TestCacherListerWatcherPagination(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lw := NewListerWatcher(store, prefix, fn)
|
lw := NewListerWatcher(store, prefix, fn, nil)
|
||||||
|
|
||||||
obj1, err := lw.List(metav1.ListOptions{Limit: 2})
|
obj1, err := lw.List(metav1.ListOptions{Limit: 2})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -125,7 +125,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
|
|||||||
wc := &testWatchCache{}
|
wc := &testWatchCache{}
|
||||||
wc.bookmarkRevision = make(chan int64, 1)
|
wc.bookmarkRevision = make(chan int64, 1)
|
||||||
wc.stopCh = make(chan struct{})
|
wc.stopCh = make(chan struct{})
|
||||||
pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{})
|
pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil)
|
||||||
go pr.Run(wc.stopCh)
|
go pr.Run(wc.stopCh)
|
||||||
wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}, pr)
|
wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}, pr)
|
||||||
// To preserve behavior of tests that assume a given capacity,
|
// To preserve behavior of tests that assume a given capacity,
|
||||||
|
@ -21,6 +21,8 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
|
||||||
@ -34,10 +36,11 @@ const (
|
|||||||
progressRequestPeriod = 100 * time.Millisecond
|
progressRequestPeriod = 100 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory) *conditionalProgressRequester {
|
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *conditionalProgressRequester {
|
||||||
pr := &conditionalProgressRequester{
|
pr := &conditionalProgressRequester{
|
||||||
clock: clock,
|
clock: clock,
|
||||||
requestWatchProgress: requestWatchProgress,
|
requestWatchProgress: requestWatchProgress,
|
||||||
|
contextMetadata: contextMetadata,
|
||||||
}
|
}
|
||||||
pr.cond = sync.NewCond(pr.mux.RLocker())
|
pr.cond = sync.NewCond(pr.mux.RLocker())
|
||||||
return pr
|
return pr
|
||||||
@ -54,6 +57,7 @@ type TickerFactory interface {
|
|||||||
type conditionalProgressRequester struct {
|
type conditionalProgressRequester struct {
|
||||||
clock TickerFactory
|
clock TickerFactory
|
||||||
requestWatchProgress WatchProgressRequester
|
requestWatchProgress WatchProgressRequester
|
||||||
|
contextMetadata metadata.MD
|
||||||
|
|
||||||
mux sync.RWMutex
|
mux sync.RWMutex
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
@ -63,6 +67,9 @@ type conditionalProgressRequester struct {
|
|||||||
|
|
||||||
func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
|
func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
|
||||||
ctx := wait.ContextForChannel(stopCh)
|
ctx := wait.ContextForChannel(stopCh)
|
||||||
|
if pr.contextMetadata != nil {
|
||||||
|
ctx = metadata.NewOutgoingContext(ctx, pr.contextMetadata)
|
||||||
|
}
|
||||||
go func() {
|
go func() {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
<-stopCh
|
<-stopCh
|
||||||
|
@ -115,7 +115,7 @@ func TestConditionalProgressRequester(t *testing.T) {
|
|||||||
|
|
||||||
func newTestConditionalProgressRequester(clock clock.WithTicker) *testConditionalProgressRequester {
|
func newTestConditionalProgressRequester(clock clock.WithTicker) *testConditionalProgressRequester {
|
||||||
pr := &testConditionalProgressRequester{}
|
pr := &testConditionalProgressRequester{}
|
||||||
pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock)
|
pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock, nil)
|
||||||
return pr
|
return pr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user