Merge pull request #124612 from ah8ad3/add-clc-warning

Feat: warn user if etcd version is not supported for RequestWatchProgress feature.
This commit is contained in:
Kubernetes Prow Robot 2024-05-10 07:52:36 -07:00 committed by GitHub
commit d8ed461ba1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 87 additions and 59 deletions

View File

@ -42,6 +42,7 @@ import (
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics" "k8s.io/apiserver/pkg/storage/cacher/metrics"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing" "k8s.io/component-base/tracing"
@ -728,9 +729,10 @@ func shouldDelegateList(opts storage.ListOptions) bool {
pred := opts.Predicate pred := opts.Predicate
match := opts.ResourceVersionMatch match := opts.ResourceVersionMatch
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
// Serve consistent reads from storage if ConsistentListFromCache is disabled // Serve consistent reads from storage if ConsistentListFromCache is disabled
consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
// Watch cache doesn't support continuations, so serve them from etcd. // Watch cache doesn't support continuations, so serve them from etcd.
hasContinuation := len(pred.Continue) > 0 hasContinuation := len(pred.Continue) > 0
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd. // Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
@ -773,7 +775,8 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
// minimal resource version, simply forward the request to storage. // minimal resource version, simply forward the request to storage.
return c.storage.GetList(ctx, key, opts, listObj) return c.storage.GetList(ctx, key, opts, listObj)
} }
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) { requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported {
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String()) listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
if err != nil { if err != nil {
return err return err

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics" "k8s.io/apiserver/pkg/storage/cacher/metrics"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing" "k8s.io/component-base/tracing"
@ -498,7 +499,8 @@ func (s sortableStoreElements) Swap(i, j int) {
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along // WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
// with their ResourceVersion and the name of the index, if any, that was used. // with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) { func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && w.notFresh(resourceVersion) { requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add() w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion) err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove() w.waitingUntilFresh.Remove()

View File

@ -39,9 +39,12 @@ import (
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/audit"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request" endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/etcd3/metrics"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
"k8s.io/apiserver/pkg/storage/value" "k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/tracing" "k8s.io/component-base/tracing"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@ -139,6 +142,9 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) { w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType) return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType)
} }
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) || utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
etcdfeature.DefaultFeatureSupportChecker.CheckClient(c.Ctx(), c, storage.RequestWatchProgress)
}
return s return s
} }

View File

@ -20,9 +20,12 @@ import (
"context" "context"
"fmt" "fmt"
"sync" "sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
@ -43,86 +46,100 @@ type FeatureSupportChecker interface {
// Supports check if the feature is supported or not by checking internal cache. // Supports check if the feature is supported or not by checking internal cache.
// By default all calls to this function before calling CheckClient returns false. // By default all calls to this function before calling CheckClient returns false.
// Returns true if all endpoints in etcd clients are supporting the feature. // Returns true if all endpoints in etcd clients are supporting the feature.
Supports(feature storage.Feature) (bool, error) // If client A supports and client B doesn't support the feature, the `Supports` will
// first return true at client A initializtion and then return false on client B
// initialzation, it can flip the support at runtime.
Supports(feature storage.Feature) bool
// CheckClient works with etcd client to recalcualte feature support and cache it internally. // CheckClient works with etcd client to recalcualte feature support and cache it internally.
// All etcd clients should support feature to cause `Supports` return true. // All etcd clients should support feature to cause `Supports` return true.
// If client A supports and client B doesn't support the feature, the `Supports` will // If client A supports and client B doesn't support the feature, the `Supports` will
// first return true at client A initializtion and then return false on client B // first return true at client A initializtion and then return false on client B
// initialzation, it can flip the support at runtime. // initialzation, it can flip the support at runtime.
CheckClient(ctx context.Context, c client, feature storage.Feature) error CheckClient(ctx context.Context, c client, feature storage.Feature)
} }
type defaultFeatureSupportChecker struct { type defaultFeatureSupportChecker struct {
lock sync.Mutex lock sync.Mutex
progressNotifySupported *bool progressNotifySupported *bool
progresNotifyEndpointCache map[string]bool checkingEndpoint map[string]struct{}
} }
func newDefaultFeatureSupportChecker() *defaultFeatureSupportChecker { func newDefaultFeatureSupportChecker() *defaultFeatureSupportChecker {
return &defaultFeatureSupportChecker{ return &defaultFeatureSupportChecker{
progresNotifyEndpointCache: make(map[string]bool), checkingEndpoint: make(map[string]struct{}),
} }
} }
// Supports can check the featue from anywhere without storage if it was cached before. // Supports can check the featue from anywhere without storage if it was cached before.
func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) (bool, error) { func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) bool {
switch feature { switch feature {
case storage.RequestWatchProgress: case storage.RequestWatchProgress:
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
return ptr.Deref(f.progressNotifySupported, false), nil return ptr.Deref(f.progressNotifySupported, false)
default: default:
return false, fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature) runtime.HandleError(fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature))
return false
} }
} }
// CheckClient accepts client and calculate the support per endpoint and caches it. // CheckClient accepts client and calculate the support per endpoint and caches it.
// It will return at any point if error happens or one endpoint is not supported. func (f *defaultFeatureSupportChecker) CheckClient(ctx context.Context, c client, feature storage.Feature) {
func (f *defaultFeatureSupportChecker) CheckClient(ctx context.Context, c client, feature storage.Feature) error {
switch feature { switch feature {
case storage.RequestWatchProgress: case storage.RequestWatchProgress:
return f.clientSupportsRequestWatchProgress(ctx, c) f.checkClient(ctx, c)
default: default:
return fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature) runtime.HandleError(fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature))
} }
} }
func (f *defaultFeatureSupportChecker) clientSupportsRequestWatchProgress(ctx context.Context, c client) error { func (f *defaultFeatureSupportChecker) checkClient(ctx context.Context, c client) {
// start with 10 ms, multiply by 2 each step, until 15 s and stays on 15 seconds.
delayFunc := wait.Backoff{
Duration: 10 * time.Millisecond,
Cap: 15 * time.Second,
Factor: 2.0,
Steps: 11}.DelayFunc()
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
for _, ep := range c.Endpoints() { for _, ep := range c.Endpoints() {
supported, err := f.supportsProgressNotifyEndpointLocked(ctx, c, ep) if _, found := f.checkingEndpoint[ep]; found {
continue
}
f.checkingEndpoint[ep] = struct{}{}
go func(ep string) {
defer runtime.HandleCrash()
err := delayFunc.Until(ctx, true, true, func(ctx context.Context) (done bool, err error) {
internalErr := f.clientSupportsRequestWatchProgress(ctx, c, ep)
return internalErr == nil, nil
})
if err != nil {
klog.ErrorS(err, "Failed to check if RequestWatchProgress is supported by etcd after retrying")
}
}(ep)
}
}
func (f *defaultFeatureSupportChecker) clientSupportsRequestWatchProgress(ctx context.Context, c client, ep string) error {
supported, err := endpointSupportsRequestWatchProgress(ctx, c, ep)
if err != nil { if err != nil {
return err return err
} }
f.lock.Lock()
defer f.lock.Unlock()
if !supported { if !supported {
klog.Infof("RequestWatchProgress feature is not supported by %q endpoint", ep)
f.progressNotifySupported = ptr.To(false) f.progressNotifySupported = ptr.To(false)
return nil return nil
} }
} if f.progressNotifySupported == nil {
if f.progressNotifySupported == nil && len(c.Endpoints()) > 0 {
f.progressNotifySupported = ptr.To(true) f.progressNotifySupported = ptr.To(true)
} }
return nil return nil
} }
func (f *defaultFeatureSupportChecker) supportsProgressNotifyEndpointLocked(ctx context.Context, c client, ep string) (bool, error) {
if supported, ok := f.progresNotifyEndpointCache[ep]; ok {
return supported, nil
}
supported, err := endpointSupportsRequestWatchProgress(ctx, c, ep)
if err != nil {
return false, err
}
f.progresNotifyEndpointCache[ep] = supported
return supported, nil
}
// Sub interface of etcd client. // Sub interface of etcd client.
type client interface { type client interface {
// Endpoints returns list of endpoints in etcd client. // Endpoints returns list of endpoints in etcd client.

View File

@ -76,20 +76,17 @@ func TestSupports(t *testing.T) {
testName string testName string
featureName string featureName string
expectedResult bool expectedResult bool
expectedError error
}{ }{
{ {
testName: "Error with unknown feature", testName: "Disabled - with unknown feature",
featureName: "some unknown feature", featureName: "some unknown feature",
expectedError: fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", "some unknown feature"),
}, },
{ {
testName: "Error with empty feature", testName: "Disabled - with empty feature",
featureName: "", featureName: "",
expectedError: fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", ""),
}, },
{ {
testName: "No error but disabled by default", testName: "Disabled - default",
featureName: storage.RequestWatchProgress, featureName: storage.RequestWatchProgress,
expectedResult: false, expectedResult: false,
}, },
@ -99,10 +96,9 @@ func TestSupports(t *testing.T) {
t.Run(tt.testName, func(t *testing.T) { t.Run(tt.testName, func(t *testing.T) {
var testFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker() var testFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker()
supported, err := testFeatureSupportChecker.Supports(tt.featureName) supported := testFeatureSupportChecker.Supports(tt.featureName)
assert.Equal(t, tt.expectedResult, supported) assert.Equal(t, tt.expectedResult, supported)
assert.Equal(t, tt.expectedError, err)
}) })
} }
} }
@ -254,18 +250,19 @@ func TestSupportsRequestWatchProgress(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) { t.Run(tt.testName, func(t *testing.T) {
var testFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker() var testFeatureSupportChecker = newDefaultFeatureSupportChecker()
for _, round := range tt.rounds { for _, round := range tt.rounds {
// Mock Etcd client // Mock Etcd client
mockClient := &MockEtcdClient{EndpointVersion: round.endpointsVersion} mockClient := &MockEtcdClient{EndpointVersion: round.endpointsVersion}
ctx := context.Background() ctx := context.Background()
err := testFeatureSupportChecker.CheckClient(ctx, mockClient, storage.RequestWatchProgress) for _, ep := range mockClient.Endpoints() {
assert.Equal(t, err, round.expectedError) err := testFeatureSupportChecker.clientSupportsRequestWatchProgress(ctx, mockClient, ep)
assert.Equal(t, round.expectedError, err)
}
// Error of Supports already tested in TestSupports. supported := testFeatureSupportChecker.Supports(storage.RequestWatchProgress)
supported, _ := testFeatureSupportChecker.Supports(storage.RequestWatchProgress) assert.Equal(t, round.expectedResult, supported)
assert.Equal(t, supported, round.expectedResult)
} }
}) })
} }

View File

@ -25,6 +25,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
apirequest "k8s.io/apiserver/pkg/endpoints/request" apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@ -165,9 +167,10 @@ func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool {
resourceVersion := opts.ResourceVersion resourceVersion := opts.ResourceVersion
match := opts.ResourceVersionMatch match := opts.ResourceVersionMatch
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
// Serve consistent reads from storage if ConsistentListFromCache is disabled // Serve consistent reads from storage if ConsistentListFromCache is disabled
consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
// Watch cache doesn't support continuations, so serve them from etcd. // Watch cache doesn't support continuations, so serve them from etcd.
hasContinuation := len(opts.Continue) > 0 hasContinuation := len(opts.Continue) > 0
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd. // Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.