From 984b475e74904dd61c10b23472798a21496edc8f Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 17 Mar 2025 15:46:02 +0100 Subject: [PATCH] Extract delegator.Helper interface to allow making delegate decision based on cache state --- .../storage/cacher/cacher_whitebox_test.go | 10 ++- .../apiserver/pkg/storage/cacher/delegator.go | 37 +++++----- .../pkg/storage/cacher/delegator/interface.go | 73 +++++++++++++++++++ .../pkg/storage/cacher/delegator_test.go | 55 -------------- .../pkg/storage/cacher/watch_cache.go | 8 +- .../request/list_work_estimator.go | 34 ++++----- 6 files changed, 118 insertions(+), 99 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator/interface.go diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index f8ce760e55b..66f399cecb4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -48,6 +48,7 @@ import ( examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/cacher/delegator" "k8s.io/apiserver/pkg/storage/cacher/metrics" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" etcdfeature "k8s.io/apiserver/pkg/storage/feature" @@ -334,9 +335,12 @@ func TestShouldDelegateList(t *testing.T) { expectBypass = bypass } } - gotBypass, _ := shouldDelegateList(toStorageOpts(opt)) - if gotBypass != expectBypass { - t.Errorf("Unexpected bypass result for List request with options %+v, bypass expected: %v, got: %v", opt, expectBypass, gotBypass) + result, err := shouldDelegateList(toStorageOpts(opt), delegator.CacheWithoutSnapshots{}) + if err != nil { + t.Fatal(err) + } + if result.ShouldDelegate != expectBypass { + t.Errorf("Unexpected bypass result for List request with options %+v, bypass expected: %v, got: %v", opt, expectBypass, result.ShouldDelegate) } } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go index dd2388bc13f..6ac329c7337 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go @@ -36,8 +36,8 @@ import ( "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/cacher/delegator" "k8s.io/apiserver/pkg/storage/cacher/metrics" - etcdfeature "k8s.io/apiserver/pkg/storage/feature" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/tracing" "k8s.io/klog/v2" @@ -180,8 +180,11 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L if err != nil { return err } - shouldDelegate, consistentRead := shouldDelegateList(opts) - if shouldDelegate { + result, err := shouldDelegateList(opts, delegator.CacheWithoutSnapshots{}) + if err != nil { + return err + } + if result.ShouldDelegate { return c.storage.GetList(ctx, key, opts, listObj) } @@ -203,7 +206,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L return c.storage.GetList(ctx, key, opts, listObj) } } - if consistentRead { + if result.ConsistentRead { listRV, err = c.storage.GetCurrentResourceVersion(ctx) if err != nil { return err @@ -215,7 +218,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L success := "true" fallback := "false" if err != nil { - if consistentRead { + if result.ConsistentRead { if storage.IsTooLargeResourceVersion(err) { fallback = "true" // Reset resourceVersion during fallback from consistent read. @@ -229,7 +232,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L } return err } - if consistentRead { + if result.ConsistentRead { metrics.ConsistentReadTotal.WithLabelValues(c.cacher.resourcePrefix, success, fallback).Add(1) } return nil @@ -243,36 +246,32 @@ func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool { return noLabelSelector && noFieldSelector && hasLimit } -// NOTICE: Keep in sync with shouldListFromStorage function in +// NOTICE: Keep in sync with shouldDelegateList function in // // staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go -func shouldDelegateList(opts storage.ListOptions) (shouldDeletage, consistentRead bool) { +func shouldDelegateList(opts storage.ListOptions, cache delegator.Helper) (delegator.Result, error) { // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list - consistentRead = false switch opts.ResourceVersionMatch { case metav1.ResourceVersionMatchExact: - return true, consistentRead + return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive) case metav1.ResourceVersionMatchNotOlderThan: - return false, consistentRead + return delegator.Result{ShouldDelegate: false}, nil case "": // Legacy exact match if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" { - return true, consistentRead + return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive) } // Continue if len(opts.Predicate.Continue) > 0 { - return true, consistentRead + return cache.ShouldDelegateContinue(opts.Predicate.Continue, opts.Recursive) } // Consistent Read if opts.ResourceVersion == "" { - consistentRead = true - consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) - requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) - return !consistentListFromCacheEnabled || !requestWatchProgressSupported, consistentRead + return cache.ShouldDelegateConsistentRead() } - return false, consistentRead + return delegator.Result{ShouldDelegate: false}, nil default: - return true, consistentRead + return delegator.Result{ShouldDelegate: true}, nil } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator/interface.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator/interface.go new file mode 100644 index 00000000000..fa56004e1b7 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator/interface.go @@ -0,0 +1,73 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delegator + +import ( + "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/storage" + etcdfeature "k8s.io/apiserver/pkg/storage/feature" + utilfeature "k8s.io/apiserver/pkg/util/feature" +) + +type Helper interface { + ShouldDelegateExactRV(rv string, recursive bool) (Result, error) + ShouldDelegateContinue(continueToken string, recursive bool) (Result, error) + ShouldDelegateConsistentRead() (Result, error) +} + +// Result of delegator decision. +type Result struct { + // Whether a request cannot be served by cache and should be delegated to etcd. + ShouldDelegate bool + // Whether a request is a consistent read, used by delegator to decide if it should call GetCurrentResourceVersion to get RV. + // Included in interface as only cacher has keyPrefix needed to parse continue token. + ConsistentRead bool +} + +type CacheWithoutSnapshots struct{} + +var _ Helper = CacheWithoutSnapshots{} + +func (c CacheWithoutSnapshots) ShouldDelegateContinue(continueToken string, recursive bool) (Result, error) { + return Result{ + ShouldDelegate: true, + // Continue with negative RV is considered a consistent read, however token cannot be parsed without keyPrefix unavailable in staging/src/k8s.io/apiserver/pkg/util/flow_control/request/list_work_estimator.go. + ConsistentRead: false, + }, nil +} + +func (c CacheWithoutSnapshots) ShouldDelegateExactRV(rv string, recursive bool) (Result, error) { + return Result{ + ShouldDelegate: true, + ConsistentRead: false, + }, nil +} + +func (c CacheWithoutSnapshots) ShouldDelegateConsistentRead() (Result, error) { + return Result{ + ShouldDelegate: !ConsistentReadSupported(), + ConsistentRead: true, + }, nil +} + +// ConsistentReadSupported returns whether cache can be used to serve reads with RV not yet observed by cache, including both consistent reads. +// Function is located here to avoid import cycles between staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go and staging/src/k8s.io/apiserver/pkg/util/flow_control/request/list_work_estimator.go. +func ConsistentReadSupported() bool { + consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) + requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) + return consistentListFromCacheEnabled && requestWatchProgressSupported +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator_test.go index cea88f556b3..f812e169f54 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator_test.go @@ -20,8 +20,6 @@ import ( "context" "testing" - "k8s.io/apimachinery/pkg/apis/meta/internalversion" - "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/apis/example" @@ -194,56 +192,3 @@ func TestCalculateDigest(t *testing.T) { }) } } - -func TestValidateUndelegatedListOptions(t *testing.T) { - opts := []storage.ListOptions{} - keyPrefix := "/pods/" - continueOnRV1, err := storage.EncodeContinue("/pods/a", keyPrefix, 1) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - continueOnNegativeRV, err := storage.EncodeContinue("/pods/a", keyPrefix, -1) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - for _, rv := range []string{"", "0", "1"} { - for _, match := range []metav1.ResourceVersionMatch{"", metav1.ResourceVersionMatchExact, metav1.ResourceVersionMatchNotOlderThan} { - for _, c := range []string{"", continueOnRV1, continueOnNegativeRV} { - for _, limit := range []int64{0, 100} { - for _, recursive := range []bool{true, false} { - opt := storage.ListOptions{ - ResourceVersion: rv, - ResourceVersionMatch: match, - Predicate: storage.SelectionPredicate{ - Limit: limit, - Continue: c, - }, - Recursive: recursive, - } - // Skip requests that will not pass apiserver validation - if errs := validation.ValidateListOptions(&internalversion.ListOptions{ - ResourceVersion: opt.ResourceVersion, - ResourceVersionMatch: opt.ResourceVersionMatch, - Limit: opt.Predicate.Limit, - Continue: opt.Predicate.Continue, - }, false); len(errs) != 0 { - continue - } - // Skip requests sent directly to etcd - if delegateList, _ := shouldDelegateList(opt); delegateList { - continue - } - opts = append(opts, opt) - } - - } - } - } - } - for _, opt := range opts { - _, _, err := storage.ValidateListOptions(keyPrefix, storage.APIObjectVersioner{}, opt) - if err != nil { - t.Errorf("Expected List requests %+v to pass validation, got: %v", opt, err) - } - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 1cd37a9fbaa..771af215383 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -32,9 +32,9 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/cacher/delegator" "k8s.io/apiserver/pkg/storage/cacher/metrics" "k8s.io/apiserver/pkg/storage/cacher/progress" - etcdfeature "k8s.io/apiserver/pkg/storage/feature" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" "k8s.io/component-base/tracing" @@ -496,8 +496,7 @@ func (s sortableStoreElements) Swap(i, j int) { // WaitUntilFreshAndList returns list of pointers to `storeElement` objects along // with their ResourceVersion and the name of the index, if any, that was used. func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, opts storage.ListOptions) (resp listResp, index string, err error) { - requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) - if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) { + if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) { w.waitingUntilFresh.Add() err = w.waitUntilFreshAndBlock(ctx, resourceVersion) w.waitingUntilFresh.Remove() @@ -562,8 +561,7 @@ func (w *watchCache) notFresh(resourceVersion uint64) bool { // WaitUntilFreshAndGet returns a pointers to object. func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) { var err error - requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) - if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) { + if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) { w.waitingUntilFresh.Add() err = w.waitUntilFreshAndBlock(ctx, resourceVersion) w.waitingUntilFresh.Remove() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go index 29015271a5f..004614314b3 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go @@ -23,10 +23,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" apirequest "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/apiserver/pkg/features" - "k8s.io/apiserver/pkg/storage" - etcdfeature "k8s.io/apiserver/pkg/storage/feature" - utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/apiserver/pkg/storage/cacher/delegator" "k8s.io/klog/v2" ) @@ -85,7 +82,12 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe return WorkEstimate{InitialSeats: e.config.MinimumSeats} } } - listFromStorage, _ := shouldListFromStorage(&listOptions) + // TODO: Check whether watchcache is enabled. + result, err := shouldDelegateList(&listOptions, delegator.CacheWithoutSnapshots{}) + if err != nil { + return WorkEstimate{InitialSeats: maxSeats} + } + listFromStorage := result.ShouldDelegate isListFromCache := requestInfo.Verb == "watch" || !listFromStorage numStored, err := e.countGetterFn(key(requestInfo)) @@ -162,32 +164,30 @@ func key(requestInfo *apirequest.RequestInfo) string { // NOTICE: Keep in sync with shouldDelegateList function in // // staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go -func shouldListFromStorage(opts *metav1.ListOptions) (shouldDeletage, consistentRead bool) { +func shouldDelegateList(opts *metav1.ListOptions, cache delegator.Helper) (delegator.Result, error) { // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list - consistentRead = false switch opts.ResourceVersionMatch { case metav1.ResourceVersionMatchExact: - return true, consistentRead + return cache.ShouldDelegateExactRV(opts.ResourceVersion, defaultRecursive) case metav1.ResourceVersionMatchNotOlderThan: - return false, consistentRead + return delegator.Result{ShouldDelegate: false}, nil case "": // Legacy exact match if opts.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" { - return true, consistentRead + return cache.ShouldDelegateExactRV(opts.ResourceVersion, defaultRecursive) } // Continue if len(opts.Continue) > 0 { - return true, consistentRead + return cache.ShouldDelegateContinue(opts.Continue, defaultRecursive) } // Consistent Read if opts.ResourceVersion == "" { - consistentRead = true - consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) - requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) - return !consistentListFromCacheEnabled || !requestWatchProgressSupported, consistentRead + return cache.ShouldDelegateConsistentRead() } - return false, consistentRead + return delegator.Result{ShouldDelegate: false}, nil default: - return true, consistentRead + return delegator.Result{ShouldDelegate: true}, nil } } + +var defaultRecursive = true