From 4c635ecf8250c7493481b9b8fb88e384de1b41db Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 24 Feb 2025 19:07:25 +0100 Subject: [PATCH] Rename CacheProxy to CacheDelegator --- .../generic/registry/storage_factory.go | 2 +- .../registry/generic/registry/store_test.go | 2 +- .../apiserver/pkg/storage/cacher/cacher.go | 38 ---------- .../pkg/storage/cacher/cacher_test.go | 16 ++--- .../cacher/cacher_testing_utils_test.go | 2 +- .../storage/cacher/cacher_whitebox_test.go | 50 ++++++------- .../storage/cacher/{proxy.go => delegator.go} | 71 ++++++++++++++----- .../request/list_work_estimator.go | 2 +- 8 files changed, 92 insertions(+), 91 deletions(-) rename staging/src/k8s.io/apiserver/pkg/storage/cacher/{proxy.go => delegator.go} (70%) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go index a3fa29f1a19..fe4153d57ed 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go @@ -79,7 +79,7 @@ func StorageWithCacher() generic.StorageDecorator { }) } - return cacherstorage.NewCacheProxy(cacher, s), destroyFunc, nil + return cacherstorage.NewCacheDelegator(cacher, s), destroyFunc, nil } } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index 96cfe4a3870..3cfb4f3427d 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -2459,7 +2459,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE } } d := destroyFunc - s = cacherstorage.NewCacheProxy(cacher, s) + s = cacherstorage.NewCacheDelegator(cacher, s) destroyFunc = func() { cacher.Stop() d() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 7109097701e..fc97c2af3b5 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -694,36 +694,6 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o return nil } -// NOTICE: Keep in sync with shouldListFromStorage function in -// -// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go -func shouldDelegateList(opts storage.ListOptions) bool { - // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list - switch opts.ResourceVersionMatch { - case metav1.ResourceVersionMatchExact: - return true - case metav1.ResourceVersionMatchNotOlderThan: - case "": - // Legacy exact match - if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" { - return true - } - default: - return true - } - // Continue - if len(opts.Predicate.Continue) > 0 { - return true - } - // Consistent Read - if opts.ResourceVersion == "" { - consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) - requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) - return !consistentListFromCacheEnabled || !requestWatchProgressSupported - } - return false -} - // computeListLimit determines whether the cacher should // apply a limit to an incoming LIST request and returns its value. // @@ -738,14 +708,6 @@ func computeListLimit(opts storage.ListOptions) int64 { return opts.Predicate.Limit } -func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool { - pred := opts.Predicate - noLabelSelector := pred.Label == nil || pred.Label.Empty() - noFieldSelector := pred.Field == nil || pred.Field.Empty() - hasLimit := pred.Limit > 0 - return noLabelSelector && noFieldSelector && hasLimit -} - func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) (listResp, string, error) { if !recursive { obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index 57b857a21e4..59fc7183d75 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -455,12 +455,12 @@ func withNodeNameAndNamespaceIndex(options *setupOptions) { } } -func testSetup(t *testing.T, opts ...setupOption) (context.Context, *CacheProxy, tearDownFunc) { +func testSetup(t *testing.T, opts ...setupOption) (context.Context, *CacheDelegator, tearDownFunc) { ctx, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...) return ctx, cacher, tearDown } -func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context, *CacheProxy, *etcd3testing.EtcdTestServer, tearDownFunc) { +func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context, *CacheDelegator, *etcd3testing.EtcdTestServer, tearDownFunc) { setupOpts := setupOptions{} opts = append([]setupOption{withDefaults}, opts...) for _, opt := range opts { @@ -514,7 +514,7 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context } } - return ctx, NewCacheProxy(cacher, wrappedStorage), server, terminate + return ctx, NewCacheDelegator(cacher, wrappedStorage), server, terminate } func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) { @@ -525,20 +525,20 @@ func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (stora t.Fatalf("unexpected error waiting for the cache to be ready") } } - return &createWrapper{CacheProxy: cacher}, tearDown + return &createWrapper{CacheDelegator: cacher}, tearDown } type createWrapper struct { - *CacheProxy + *CacheDelegator } func (c *createWrapper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { - if err := c.CacheProxy.Create(ctx, key, obj, out, ttl); err != nil { + if err := c.CacheDelegator.Create(ctx, key, obj, out, ttl); err != nil { return err } return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { - currentObj := c.CacheProxy.cacher.newFunc() - err := c.CacheProxy.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj) + currentObj := c.CacheDelegator.cacher.newFunc() + err := c.CacheDelegator.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj) if err != nil { if storage.IsNotFound(err) { return false, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go index dae46e291a4..bd574ab132e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go @@ -78,7 +78,7 @@ func computePodKey(obj *example.Pod) string { return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name) } -func compactStorage(c *CacheProxy, client *clientv3.Client) storagetesting.Compaction { +func compactStorage(c *CacheDelegator, client *clientv3.Client) storagetesting.Compaction { return func(ctx context.Context, t *testing.T, resourceVersion string) { versioner := storage.APIObjectVersioner{} rv, err := versioner.ParseResourceVersion(resourceVersion) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index f13ed32275f..0b0dab1504f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -317,7 +317,7 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp t.Fatalf("Couldn't create cacher: %v", err) } defer cacher.Stop() - proxy := NewCacheProxy(cacher, backingStorage) + delegator := NewCacheDelegator(cacher, backingStorage) result := &example.PodList{} if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { if err := cacher.ready.wait(context.Background()); err != nil { @@ -342,7 +342,7 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp return nil } } - err = proxy.GetList(context.TODO(), "pods/ns", options, result) + err = delegator.GetList(context.TODO(), "pods/ns", options, result) gotBypass := errors.Is(err, errDummy) if err != nil && !gotBypass { t.Fatalf("Unexpected error for List request with options: %v, err: %v", options, err) @@ -441,7 +441,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su t.Fatalf("Couldn't create cacher: %v", err) } defer cacher.Stop() - proxy := NewCacheProxy(cacher, backingStorage) + delegator := NewCacheDelegator(cacher, backingStorage) if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } @@ -484,7 +484,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su } start := cacher.clock.Now() - err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result) + err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result) clockStepCancelFn() duration := cacher.clock.Since(start) if (err != nil) != tc.expectError { @@ -516,7 +516,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) { t.Fatalf("Couldn't create cacher: %v", err) } defer cacher.Stop() - proxy := NewCacheProxy(cacher, backingStorage) + delegator := NewCacheDelegator(cacher, backingStorage) pred := storage.SelectionPredicate{ Limit: 500, @@ -531,7 +531,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) { // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.injectError(errDummy) - err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{ + err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{ ResourceVersion: "0", Predicate: pred, }, result) @@ -539,7 +539,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) { t.Errorf("GetList with Limit and RV=0 should be served from cache: %v", err) } - err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{ + err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{ ResourceVersion: "", Predicate: pred, }, result) @@ -555,7 +555,7 @@ func TestGetCacheBypass(t *testing.T) { t.Fatalf("Couldn't create cacher: %v", err) } defer cacher.Stop() - proxy := NewCacheProxy(cacher, backingStorage) + delegator := NewCacheDelegator(cacher, backingStorage) result := &example.Pod{} @@ -567,7 +567,7 @@ func TestGetCacheBypass(t *testing.T) { // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.injectError(errDummy) - err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ + err = delegator.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ IgnoreNotFound: true, ResourceVersion: "0", }, result) @@ -575,7 +575,7 @@ func TestGetCacheBypass(t *testing.T) { t.Errorf("Get with RV=0 should be served from cache: %v", err) } - err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ + err = delegator.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ IgnoreNotFound: true, ResourceVersion: "", }, result) @@ -591,7 +591,7 @@ func TestWatchCacheBypass(t *testing.T) { t.Fatalf("Couldn't create cacher: %v", err) } defer cacher.Stop() - proxy := NewCacheProxy(cacher, backingStorage) + delegator := NewCacheDelegator(cacher, backingStorage) if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { if err := cacher.ready.wait(context.Background()); err != nil { @@ -599,7 +599,7 @@ func TestWatchCacheBypass(t *testing.T) { } } - _, err = proxy.Watch(context.TODO(), "pod/ns", storage.ListOptions{ + _, err = delegator.Watch(context.TODO(), "pod/ns", storage.ListOptions{ ResourceVersion: "0", Predicate: storage.Everything, }) @@ -607,7 +607,7 @@ func TestWatchCacheBypass(t *testing.T) { t.Errorf("Watch with RV=0 should be served from cache: %v", err) } - _, err = proxy.Watch(context.TODO(), "pod/ns", storage.ListOptions{ + _, err = delegator.Watch(context.TODO(), "pod/ns", storage.ListOptions{ ResourceVersion: "", Predicate: storage.Everything, }) @@ -628,7 +628,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) { t.Fatalf("Couldn't create cacher: %v", err) } defer cacher.Stop() - proxy := NewCacheProxy(cacher, backingStorage) + delegator := NewCacheDelegator(cacher, backingStorage) opts := storage.ListOptions{ ResourceVersion: "0", @@ -640,7 +640,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) { defer listCancel() result := &example.PodList{} - err = proxy.GetList(listCtx, "/pods/ns", opts, result) + err = delegator.GetList(listCtx, "/pods/ns", opts, result) if err != nil && apierrors.IsTooManyRequests(err) { t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for List") } @@ -648,7 +648,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) { watchCtx, watchCancel := context.WithTimeout(context.Background(), 250*time.Millisecond) defer watchCancel() - _, err = proxy.Watch(watchCtx, "/pods/ns", opts) + _, err = delegator.Watch(watchCtx, "/pods/ns", opts) if err != nil && apierrors.IsTooManyRequests(err) { t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for Watch") } @@ -873,7 +873,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) { if err != nil { t.Fatalf("Couldn't create cacher: %v", err) } - proxy := NewCacheProxy(cacher, backingStorage) + delegator := NewCacheDelegator(cacher, backingStorage) if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { if err := cacher.ready.wait(context.Background()); err != nil { @@ -881,7 +881,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) { } } - w, err := proxy.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + w, err := delegator.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Failed to create watch: %v", err) } @@ -901,13 +901,13 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) { cacher.Stop() - _, err = proxy.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + _, err = delegator.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err == nil { t.Fatalf("Success to create Watch: %v", err) } result := &example.Pod{} - err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ + err = delegator.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ IgnoreNotFound: true, ResourceVersion: "1", }, result) @@ -922,7 +922,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) { } listResult := &example.PodList{} - err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{ + err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{ ResourceVersion: "1", Recursive: true, Predicate: storage.SelectionPredicate{ @@ -2291,7 +2291,7 @@ func BenchmarkCacher_GetList(b *testing.B) { b.Fatalf("new cacher: %v", err) } defer cacher.Stop() - proxy := NewCacheProxy(cacher, store) + delegator := NewCacheDelegator(cacher, store) // prepare result and pred parsedField, err := fields.ParseSelector("spec.nodeName=node-0") @@ -2307,7 +2307,7 @@ func BenchmarkCacher_GetList(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { result := &example.PodList{} - err = proxy.GetList(context.TODO(), "pods", storage.ListOptions{ + err = delegator.GetList(context.TODO(), "pods", storage.ListOptions{ Predicate: pred, Recursive: true, ResourceVersion: "12345", @@ -3179,8 +3179,8 @@ func TestRetryAfterForUnreadyCache(t *testing.T) { Predicate: storage.Everything, } result := &example.PodList{} - proxy := NewCacheProxy(cacher, backingStorage) - err = proxy.GetList(context.TODO(), "/pods/ns", opts, result) + delegator := NewCacheDelegator(cacher, backingStorage) + err = delegator.GetList(context.TODO(), "/pods/ns", opts, result) if !apierrors.IsTooManyRequests(err) { t.Fatalf("Unexpected GetList error: %v", err) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/proxy.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go similarity index 70% rename from staging/src/k8s.io/apiserver/pkg/storage/cacher/proxy.go rename to staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go index b49544a71b2..79824f6d67a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/proxy.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/otel/attribute" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/audit" @@ -34,29 +35,29 @@ import ( "k8s.io/klog/v2" ) -func NewCacheProxy(cacher *Cacher, storage storage.Interface) *CacheProxy { - return &CacheProxy{ +func NewCacheDelegator(cacher *Cacher, storage storage.Interface) *CacheDelegator { + return &CacheDelegator{ cacher: cacher, storage: storage, } } -type CacheProxy struct { +type CacheDelegator struct { cacher *Cacher storage storage.Interface } -var _ storage.Interface = (*CacheProxy)(nil) +var _ storage.Interface = (*CacheDelegator)(nil) -func (c *CacheProxy) Versioner() storage.Versioner { +func (c *CacheDelegator) Versioner() storage.Versioner { return c.storage.Versioner() } -func (c *CacheProxy) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { +func (c *CacheDelegator) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { return c.storage.Create(ctx, key, obj, out, ttl) } -func (c *CacheProxy) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error { +func (c *CacheDelegator) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error { // Ignore the suggestion and try to pass down the current version of the object // read from cache. if elem, exists, err := c.cacher.watchCache.GetByKey(key); err != nil { @@ -71,7 +72,7 @@ func (c *CacheProxy) Delete(ctx context.Context, key string, out runtime.Object, return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil, opts) } -func (c *CacheProxy) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { +func (c *CacheDelegator) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { // if the watch-list feature wasn't set and the resourceVersion is unset // ensure that the rv from which the watch is being served, is the latest // one. "latest" is ensured by serving the watch from @@ -89,7 +90,7 @@ func (c *CacheProxy) Watch(ctx context.Context, key string, opts storage.ListOpt return c.cacher.Watch(ctx, key, opts) } -func (c *CacheProxy) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { +func (c *CacheDelegator) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { ctx, span := tracing.Start(ctx, "cacher.Get", attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)), attribute.String("key", key), @@ -104,7 +105,7 @@ func (c *CacheProxy) Get(ctx context.Context, key string, opts storage.GetOption if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { if !c.cacher.Ready() { - // If Cache is not initialized, delegate Get requests to storage + // If Cache is not initialized, delegator Get requests to storage // as described in https://kep.k8s.io/4568 span.AddEvent("About to Get from underlying storage - cache not initialized") return c.storage.Get(ctx, key, opts, objPtr) @@ -133,7 +134,7 @@ func (c *CacheProxy) Get(ctx context.Context, key string, opts storage.GetOption return c.cacher.Get(ctx, key, opts, objPtr) } -func (c *CacheProxy) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { +func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { if shouldDelegateList(opts) { return c.storage.GetList(ctx, key, opts, listObj) } @@ -145,7 +146,7 @@ func (c *CacheProxy) GetList(ctx context.Context, key string, opts storage.ListO if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { if !c.cacher.Ready() && shouldDelegateListOnNotReadyCache(opts) { - // If Cacher is not initialized, delegate List requests to storage + // If Cacher is not initialized, delegator List requests to storage // as described in https://kep.k8s.io/4568 return c.storage.GetList(ctx, key, opts, listObj) } @@ -186,7 +187,45 @@ func (c *CacheProxy) GetList(ctx context.Context, key string, opts storage.ListO return nil } -func (c *CacheProxy) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error { +func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool { + pred := opts.Predicate + noLabelSelector := pred.Label == nil || pred.Label.Empty() + noFieldSelector := pred.Field == nil || pred.Field.Empty() + hasLimit := pred.Limit > 0 + return noLabelSelector && noFieldSelector && hasLimit +} + +// NOTICE: Keep in sync with shouldListFromStorage function in +// +// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go +func shouldDelegateList(opts storage.ListOptions) bool { + // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list + switch opts.ResourceVersionMatch { + case metav1.ResourceVersionMatchExact: + return true + case metav1.ResourceVersionMatchNotOlderThan: + case "": + // Legacy exact match + if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" { + return true + } + default: + return true + } + // Continue + if len(opts.Predicate.Continue) > 0 { + return true + } + // Consistent Read + if opts.ResourceVersion == "" { + consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) + requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) + return !consistentListFromCacheEnabled || !requestWatchProgressSupported + } + return false +} + +func (c *CacheDelegator) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error { // Ignore the suggestion and try to pass down the current version of the object // read from cache. if elem, exists, err := c.cacher.watchCache.GetByKey(key); err != nil { @@ -201,17 +240,17 @@ func (c *CacheProxy) GuaranteedUpdate(ctx context.Context, key string, destinati return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil) } -func (c *CacheProxy) Count(pathPrefix string) (int64, error) { +func (c *CacheDelegator) Count(pathPrefix string) (int64, error) { return c.storage.Count(pathPrefix) } -func (c *CacheProxy) ReadinessCheck() error { +func (c *CacheDelegator) ReadinessCheck() error { if !c.cacher.Ready() { return storage.ErrStorageNotReady } return nil } -func (c *CacheProxy) RequestWatchProgress(ctx context.Context) error { +func (c *CacheDelegator) RequestWatchProgress(ctx context.Context) error { return c.storage.RequestWatchProgress(ctx) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go index 2546593d692..e1664be8ee5 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go @@ -162,7 +162,7 @@ func key(requestInfo *apirequest.RequestInfo) string { // NOTICE: Keep in sync with shouldDelegateList function in // -// staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +// staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool { // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list switch opts.ResourceVersionMatch {