diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 1e6cc4d021a..bb74ed3a6ee 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -42,6 +42,7 @@ import ( "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/cacher/metrics" + etcdfeature "k8s.io/apiserver/pkg/storage/feature" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" "k8s.io/component-base/tracing" @@ -728,9 +729,10 @@ func shouldDelegateList(opts storage.ListOptions) bool { pred := opts.Predicate match := opts.ResourceVersionMatch consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) + requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) // Serve consistent reads from storage if ConsistentListFromCache is disabled - consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled + consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported) // Watch cache doesn't support continuations, so serve them from etcd. hasContinuation := len(pred.Continue) > 0 // Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd. @@ -773,7 +775,8 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio // minimal resource version, simply forward the request to storage. return c.storage.GetList(ctx, key, opts, listObj) } - if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) { + requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) + if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported { listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String()) if err != nil { return err diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 9c4d93f176a..26eaf686078 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -33,6 +33,7 @@ import ( "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/cacher/metrics" + etcdfeature "k8s.io/apiserver/pkg/storage/feature" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" "k8s.io/component-base/tracing" @@ -498,7 +499,8 @@ func (s sortableStoreElements) Swap(i, j int) { // WaitUntilFreshAndList returns list of pointers to `storeElement` objects along // with their ResourceVersion and the name of the index, if any, that was used. func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) { - if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && w.notFresh(resourceVersion) { + requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) + if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) { w.waitingUntilFresh.Add() err = w.waitUntilFreshAndBlock(ctx, resourceVersion) w.waitingUntilFresh.Remove() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index ed6ebe4669f..6b1d2d2cab4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -39,9 +39,12 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/audit" endpointsrequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3/metrics" + etcdfeature "k8s.io/apiserver/pkg/storage/feature" "k8s.io/apiserver/pkg/storage/value" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/tracing" "k8s.io/klog/v2" ) @@ -139,6 +142,9 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) { return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType) } + if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) || utilfeature.DefaultFeatureGate.Enabled(features.WatchList) { + etcdfeature.DefaultFeatureSupportChecker.CheckClient(c.Ctx(), c, storage.RequestWatchProgress) + } return s } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/feature/feature_support_checker.go b/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go similarity index 68% rename from staging/src/k8s.io/apiserver/pkg/storage/etcd3/feature/feature_support_checker.go rename to staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go index ddf77b8e8b0..77f5ab05c9c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/feature/feature_support_checker.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go @@ -20,9 +20,12 @@ import ( "context" "fmt" "sync" + "time" clientv3 "go.etcd.io/etcd/client/v3" + "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/version" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/storage" "k8s.io/klog/v2" "k8s.io/utils/ptr" @@ -43,86 +46,100 @@ type FeatureSupportChecker interface { // Supports check if the feature is supported or not by checking internal cache. // By default all calls to this function before calling CheckClient returns false. // Returns true if all endpoints in etcd clients are supporting the feature. - Supports(feature storage.Feature) (bool, error) + // If client A supports and client B doesn't support the feature, the `Supports` will + // first return true at client A initializtion and then return false on client B + // initialzation, it can flip the support at runtime. + Supports(feature storage.Feature) bool // CheckClient works with etcd client to recalcualte feature support and cache it internally. // All etcd clients should support feature to cause `Supports` return true. // If client A supports and client B doesn't support the feature, the `Supports` will // first return true at client A initializtion and then return false on client B // initialzation, it can flip the support at runtime. - CheckClient(ctx context.Context, c client, feature storage.Feature) error + CheckClient(ctx context.Context, c client, feature storage.Feature) } type defaultFeatureSupportChecker struct { - lock sync.Mutex - progressNotifySupported *bool - progresNotifyEndpointCache map[string]bool + lock sync.Mutex + progressNotifySupported *bool + checkingEndpoint map[string]struct{} } func newDefaultFeatureSupportChecker() *defaultFeatureSupportChecker { return &defaultFeatureSupportChecker{ - progresNotifyEndpointCache: make(map[string]bool), + checkingEndpoint: make(map[string]struct{}), } } // Supports can check the featue from anywhere without storage if it was cached before. -func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) (bool, error) { +func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) bool { switch feature { case storage.RequestWatchProgress: f.lock.Lock() defer f.lock.Unlock() - return ptr.Deref(f.progressNotifySupported, false), nil + return ptr.Deref(f.progressNotifySupported, false) default: - return false, fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature) + runtime.HandleError(fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature)) + return false } } // CheckClient accepts client and calculate the support per endpoint and caches it. -// It will return at any point if error happens or one endpoint is not supported. -func (f *defaultFeatureSupportChecker) CheckClient(ctx context.Context, c client, feature storage.Feature) error { +func (f *defaultFeatureSupportChecker) CheckClient(ctx context.Context, c client, feature storage.Feature) { switch feature { case storage.RequestWatchProgress: - return f.clientSupportsRequestWatchProgress(ctx, c) + f.checkClient(ctx, c) default: - return fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature) - + runtime.HandleError(fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature)) } } -func (f *defaultFeatureSupportChecker) clientSupportsRequestWatchProgress(ctx context.Context, c client) error { +func (f *defaultFeatureSupportChecker) checkClient(ctx context.Context, c client) { + // start with 10 ms, multiply by 2 each step, until 15 s and stays on 15 seconds. + delayFunc := wait.Backoff{ + Duration: 10 * time.Millisecond, + Cap: 15 * time.Second, + Factor: 2.0, + Steps: 11}.DelayFunc() + f.lock.Lock() + defer f.lock.Unlock() + for _, ep := range c.Endpoints() { + if _, found := f.checkingEndpoint[ep]; found { + continue + } + f.checkingEndpoint[ep] = struct{}{} + go func(ep string) { + defer runtime.HandleCrash() + err := delayFunc.Until(ctx, true, true, func(ctx context.Context) (done bool, err error) { + internalErr := f.clientSupportsRequestWatchProgress(ctx, c, ep) + return internalErr == nil, nil + }) + if err != nil { + klog.ErrorS(err, "Failed to check if RequestWatchProgress is supported by etcd after retrying") + } + }(ep) + } +} + +func (f *defaultFeatureSupportChecker) clientSupportsRequestWatchProgress(ctx context.Context, c client, ep string) error { + supported, err := endpointSupportsRequestWatchProgress(ctx, c, ep) + if err != nil { + return err + } f.lock.Lock() defer f.lock.Unlock() - for _, ep := range c.Endpoints() { - supported, err := f.supportsProgressNotifyEndpointLocked(ctx, c, ep) - if err != nil { - return err - } - if !supported { - f.progressNotifySupported = ptr.To(false) - return nil - } + if !supported { + klog.Infof("RequestWatchProgress feature is not supported by %q endpoint", ep) + f.progressNotifySupported = ptr.To(false) + return nil } - if f.progressNotifySupported == nil && len(c.Endpoints()) > 0 { + if f.progressNotifySupported == nil { f.progressNotifySupported = ptr.To(true) } return nil } -func (f *defaultFeatureSupportChecker) supportsProgressNotifyEndpointLocked(ctx context.Context, c client, ep string) (bool, error) { - if supported, ok := f.progresNotifyEndpointCache[ep]; ok { - return supported, nil - } - - supported, err := endpointSupportsRequestWatchProgress(ctx, c, ep) - if err != nil { - return false, err - } - - f.progresNotifyEndpointCache[ep] = supported - return supported, nil -} - // Sub interface of etcd client. type client interface { // Endpoints returns list of endpoints in etcd client. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/feature/feature_support_checker_test.go b/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker_test.go similarity index 88% rename from staging/src/k8s.io/apiserver/pkg/storage/etcd3/feature/feature_support_checker_test.go rename to staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker_test.go index c800dfd5733..2e4260e821d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/feature/feature_support_checker_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker_test.go @@ -76,20 +76,17 @@ func TestSupports(t *testing.T) { testName string featureName string expectedResult bool - expectedError error }{ { - testName: "Error with unknown feature", - featureName: "some unknown feature", - expectedError: fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", "some unknown feature"), + testName: "Disabled - with unknown feature", + featureName: "some unknown feature", }, { - testName: "Error with empty feature", - featureName: "", - expectedError: fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", ""), + testName: "Disabled - with empty feature", + featureName: "", }, { - testName: "No error but disabled by default", + testName: "Disabled - default", featureName: storage.RequestWatchProgress, expectedResult: false, }, @@ -99,10 +96,9 @@ func TestSupports(t *testing.T) { t.Run(tt.testName, func(t *testing.T) { var testFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker() - supported, err := testFeatureSupportChecker.Supports(tt.featureName) + supported := testFeatureSupportChecker.Supports(tt.featureName) assert.Equal(t, tt.expectedResult, supported) - assert.Equal(t, tt.expectedError, err) }) } } @@ -254,18 +250,19 @@ func TestSupportsRequestWatchProgress(t *testing.T) { } for _, tt := range tests { t.Run(tt.testName, func(t *testing.T) { - var testFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker() + var testFeatureSupportChecker = newDefaultFeatureSupportChecker() for _, round := range tt.rounds { // Mock Etcd client mockClient := &MockEtcdClient{EndpointVersion: round.endpointsVersion} ctx := context.Background() - err := testFeatureSupportChecker.CheckClient(ctx, mockClient, storage.RequestWatchProgress) - assert.Equal(t, err, round.expectedError) + for _, ep := range mockClient.Endpoints() { + err := testFeatureSupportChecker.clientSupportsRequestWatchProgress(ctx, mockClient, ep) + assert.Equal(t, round.expectedError, err) + } - // Error of Supports already tested in TestSupports. - supported, _ := testFeatureSupportChecker.Supports(storage.RequestWatchProgress) - assert.Equal(t, supported, round.expectedResult) + supported := testFeatureSupportChecker.Supports(storage.RequestWatchProgress) + assert.Equal(t, round.expectedResult, supported) } }) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go index 6b941cb7fe1..d880469659c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go @@ -25,6 +25,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/storage" + etcdfeature "k8s.io/apiserver/pkg/storage/feature" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" ) @@ -165,9 +167,10 @@ func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool { resourceVersion := opts.ResourceVersion match := opts.ResourceVersionMatch consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) + requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) // Serve consistent reads from storage if ConsistentListFromCache is disabled - consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled + consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported) // Watch cache doesn't support continuations, so serve them from etcd. hasContinuation := len(opts.Continue) > 0 // Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.