Feat: warn user if etcd version is not supported for RequestWatchProgress

This commit is contained in:
ah8ad3 2024-04-29 15:56:35 +03:30 committed by Ahmad Zolfaghari
parent 4946c1fde2
commit 9f8273a5c5
6 changed files with 87 additions and 59 deletions

View File

@ -42,6 +42,7 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing"
@ -728,9 +729,10 @@ func shouldDelegateList(opts storage.ListOptions) bool {
pred := opts.Predicate
match := opts.ResourceVersionMatch
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
// Serve consistent reads from storage if ConsistentListFromCache is disabled
consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
// Watch cache doesn't support continuations, so serve them from etcd.
hasContinuation := len(pred.Continue) > 0
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
@ -773,7 +775,8 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
// minimal resource version, simply forward the request to storage.
return c.storage.GetList(ctx, key, opts, listObj)
}
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported {
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
if err != nil {
return err

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing"
@ -498,7 +499,8 @@ func (s sortableStoreElements) Swap(i, j int) {
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
// with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && w.notFresh(resourceVersion) {
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove()

View File

@ -39,9 +39,12 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
)
@ -139,6 +142,9 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType)
}
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) || utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
etcdfeature.DefaultFeatureSupportChecker.CheckClient(c.Ctx(), c, storage.RequestWatchProgress)
}
return s
}

View File

@ -20,9 +20,12 @@ import (
"context"
"fmt"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
@ -43,86 +46,100 @@ type FeatureSupportChecker interface {
// Supports check if the feature is supported or not by checking internal cache.
// By default all calls to this function before calling CheckClient returns false.
// Returns true if all endpoints in etcd clients are supporting the feature.
Supports(feature storage.Feature) (bool, error)
// If client A supports and client B doesn't support the feature, the `Supports` will
// first return true at client A initializtion and then return false on client B
// initialzation, it can flip the support at runtime.
Supports(feature storage.Feature) bool
// CheckClient works with etcd client to recalcualte feature support and cache it internally.
// All etcd clients should support feature to cause `Supports` return true.
// If client A supports and client B doesn't support the feature, the `Supports` will
// first return true at client A initializtion and then return false on client B
// initialzation, it can flip the support at runtime.
CheckClient(ctx context.Context, c client, feature storage.Feature) error
CheckClient(ctx context.Context, c client, feature storage.Feature)
}
type defaultFeatureSupportChecker struct {
lock sync.Mutex
progressNotifySupported *bool
progresNotifyEndpointCache map[string]bool
lock sync.Mutex
progressNotifySupported *bool
checkingEndpoint map[string]struct{}
}
func newDefaultFeatureSupportChecker() *defaultFeatureSupportChecker {
return &defaultFeatureSupportChecker{
progresNotifyEndpointCache: make(map[string]bool),
checkingEndpoint: make(map[string]struct{}),
}
}
// Supports can check the featue from anywhere without storage if it was cached before.
func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) (bool, error) {
func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) bool {
switch feature {
case storage.RequestWatchProgress:
f.lock.Lock()
defer f.lock.Unlock()
return ptr.Deref(f.progressNotifySupported, false), nil
return ptr.Deref(f.progressNotifySupported, false)
default:
return false, fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature)
runtime.HandleError(fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature))
return false
}
}
// CheckClient accepts client and calculate the support per endpoint and caches it.
// It will return at any point if error happens or one endpoint is not supported.
func (f *defaultFeatureSupportChecker) CheckClient(ctx context.Context, c client, feature storage.Feature) error {
func (f *defaultFeatureSupportChecker) CheckClient(ctx context.Context, c client, feature storage.Feature) {
switch feature {
case storage.RequestWatchProgress:
return f.clientSupportsRequestWatchProgress(ctx, c)
f.checkClient(ctx, c)
default:
return fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature)
runtime.HandleError(fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature))
}
}
func (f *defaultFeatureSupportChecker) clientSupportsRequestWatchProgress(ctx context.Context, c client) error {
func (f *defaultFeatureSupportChecker) checkClient(ctx context.Context, c client) {
// start with 10 ms, multiply by 2 each step, until 15 s and stays on 15 seconds.
delayFunc := wait.Backoff{
Duration: 10 * time.Millisecond,
Cap: 15 * time.Second,
Factor: 2.0,
Steps: 11}.DelayFunc()
f.lock.Lock()
defer f.lock.Unlock()
for _, ep := range c.Endpoints() {
if _, found := f.checkingEndpoint[ep]; found {
continue
}
f.checkingEndpoint[ep] = struct{}{}
go func(ep string) {
defer runtime.HandleCrash()
err := delayFunc.Until(ctx, true, true, func(ctx context.Context) (done bool, err error) {
internalErr := f.clientSupportsRequestWatchProgress(ctx, c, ep)
return internalErr == nil, nil
})
if err != nil {
klog.ErrorS(err, "Failed to check if RequestWatchProgress is supported by etcd after retrying")
}
}(ep)
}
}
func (f *defaultFeatureSupportChecker) clientSupportsRequestWatchProgress(ctx context.Context, c client, ep string) error {
supported, err := endpointSupportsRequestWatchProgress(ctx, c, ep)
if err != nil {
return err
}
f.lock.Lock()
defer f.lock.Unlock()
for _, ep := range c.Endpoints() {
supported, err := f.supportsProgressNotifyEndpointLocked(ctx, c, ep)
if err != nil {
return err
}
if !supported {
f.progressNotifySupported = ptr.To(false)
return nil
}
if !supported {
klog.Infof("RequestWatchProgress feature is not supported by %q endpoint", ep)
f.progressNotifySupported = ptr.To(false)
return nil
}
if f.progressNotifySupported == nil && len(c.Endpoints()) > 0 {
if f.progressNotifySupported == nil {
f.progressNotifySupported = ptr.To(true)
}
return nil
}
func (f *defaultFeatureSupportChecker) supportsProgressNotifyEndpointLocked(ctx context.Context, c client, ep string) (bool, error) {
if supported, ok := f.progresNotifyEndpointCache[ep]; ok {
return supported, nil
}
supported, err := endpointSupportsRequestWatchProgress(ctx, c, ep)
if err != nil {
return false, err
}
f.progresNotifyEndpointCache[ep] = supported
return supported, nil
}
// Sub interface of etcd client.
type client interface {
// Endpoints returns list of endpoints in etcd client.

View File

@ -76,20 +76,17 @@ func TestSupports(t *testing.T) {
testName string
featureName string
expectedResult bool
expectedError error
}{
{
testName: "Error with unknown feature",
featureName: "some unknown feature",
expectedError: fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", "some unknown feature"),
testName: "Disabled - with unknown feature",
featureName: "some unknown feature",
},
{
testName: "Error with empty feature",
featureName: "",
expectedError: fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", ""),
testName: "Disabled - with empty feature",
featureName: "",
},
{
testName: "No error but disabled by default",
testName: "Disabled - default",
featureName: storage.RequestWatchProgress,
expectedResult: false,
},
@ -99,10 +96,9 @@ func TestSupports(t *testing.T) {
t.Run(tt.testName, func(t *testing.T) {
var testFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker()
supported, err := testFeatureSupportChecker.Supports(tt.featureName)
supported := testFeatureSupportChecker.Supports(tt.featureName)
assert.Equal(t, tt.expectedResult, supported)
assert.Equal(t, tt.expectedError, err)
})
}
}
@ -254,18 +250,19 @@ func TestSupportsRequestWatchProgress(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
var testFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker()
var testFeatureSupportChecker = newDefaultFeatureSupportChecker()
for _, round := range tt.rounds {
// Mock Etcd client
mockClient := &MockEtcdClient{EndpointVersion: round.endpointsVersion}
ctx := context.Background()
err := testFeatureSupportChecker.CheckClient(ctx, mockClient, storage.RequestWatchProgress)
assert.Equal(t, err, round.expectedError)
for _, ep := range mockClient.Endpoints() {
err := testFeatureSupportChecker.clientSupportsRequestWatchProgress(ctx, mockClient, ep)
assert.Equal(t, round.expectedError, err)
}
// Error of Supports already tested in TestSupports.
supported, _ := testFeatureSupportChecker.Supports(storage.RequestWatchProgress)
assert.Equal(t, supported, round.expectedResult)
supported := testFeatureSupportChecker.Supports(storage.RequestWatchProgress)
assert.Equal(t, round.expectedResult, supported)
}
})
}

View File

@ -25,6 +25,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
)
@ -165,9 +167,10 @@ func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool {
resourceVersion := opts.ResourceVersion
match := opts.ResourceVersionMatch
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
// Serve consistent reads from storage if ConsistentListFromCache is disabled
consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
// Watch cache doesn't support continuations, so serve them from etcd.
hasContinuation := len(opts.Continue) > 0
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.