Extract delegator.Helper interface to allow making delegate decision based on cache state

This commit is contained in:
Marek Siarkowicz 2025-03-17 15:46:02 +01:00 committed by Marek Siarkowicz
parent 917a556981
commit 984b475e74
6 changed files with 118 additions and 99 deletions

View File

@ -48,6 +48,7 @@ import (
examplev1 "k8s.io/apiserver/pkg/apis/example/v1" examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/delegator"
"k8s.io/apiserver/pkg/storage/cacher/metrics" "k8s.io/apiserver/pkg/storage/cacher/metrics"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
etcdfeature "k8s.io/apiserver/pkg/storage/feature" etcdfeature "k8s.io/apiserver/pkg/storage/feature"
@ -334,9 +335,12 @@ func TestShouldDelegateList(t *testing.T) {
expectBypass = bypass expectBypass = bypass
} }
} }
gotBypass, _ := shouldDelegateList(toStorageOpts(opt)) result, err := shouldDelegateList(toStorageOpts(opt), delegator.CacheWithoutSnapshots{})
if gotBypass != expectBypass { if err != nil {
t.Errorf("Unexpected bypass result for List request with options %+v, bypass expected: %v, got: %v", opt, expectBypass, gotBypass) t.Fatal(err)
}
if result.ShouldDelegate != expectBypass {
t.Errorf("Unexpected bypass result for List request with options %+v, bypass expected: %v, got: %v", opt, expectBypass, result.ShouldDelegate)
} }
} }
} }

View File

@ -36,8 +36,8 @@ import (
"k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/delegator"
"k8s.io/apiserver/pkg/storage/cacher/metrics" "k8s.io/apiserver/pkg/storage/cacher/metrics"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/tracing" "k8s.io/component-base/tracing"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -180,8 +180,11 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
if err != nil { if err != nil {
return err return err
} }
shouldDelegate, consistentRead := shouldDelegateList(opts) result, err := shouldDelegateList(opts, delegator.CacheWithoutSnapshots{})
if shouldDelegate { if err != nil {
return err
}
if result.ShouldDelegate {
return c.storage.GetList(ctx, key, opts, listObj) return c.storage.GetList(ctx, key, opts, listObj)
} }
@ -203,7 +206,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
return c.storage.GetList(ctx, key, opts, listObj) return c.storage.GetList(ctx, key, opts, listObj)
} }
} }
if consistentRead { if result.ConsistentRead {
listRV, err = c.storage.GetCurrentResourceVersion(ctx) listRV, err = c.storage.GetCurrentResourceVersion(ctx)
if err != nil { if err != nil {
return err return err
@ -215,7 +218,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
success := "true" success := "true"
fallback := "false" fallback := "false"
if err != nil { if err != nil {
if consistentRead { if result.ConsistentRead {
if storage.IsTooLargeResourceVersion(err) { if storage.IsTooLargeResourceVersion(err) {
fallback = "true" fallback = "true"
// Reset resourceVersion during fallback from consistent read. // Reset resourceVersion during fallback from consistent read.
@ -229,7 +232,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
} }
return err return err
} }
if consistentRead { if result.ConsistentRead {
metrics.ConsistentReadTotal.WithLabelValues(c.cacher.resourcePrefix, success, fallback).Add(1) metrics.ConsistentReadTotal.WithLabelValues(c.cacher.resourcePrefix, success, fallback).Add(1)
} }
return nil return nil
@ -243,36 +246,32 @@ func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
return noLabelSelector && noFieldSelector && hasLimit return noLabelSelector && noFieldSelector && hasLimit
} }
// NOTICE: Keep in sync with shouldListFromStorage function in // NOTICE: Keep in sync with shouldDelegateList function in
// //
// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go // staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go
func shouldDelegateList(opts storage.ListOptions) (shouldDeletage, consistentRead bool) { func shouldDelegateList(opts storage.ListOptions, cache delegator.Helper) (delegator.Result, error) {
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
consistentRead = false
switch opts.ResourceVersionMatch { switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchExact: case metav1.ResourceVersionMatchExact:
return true, consistentRead return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive)
case metav1.ResourceVersionMatchNotOlderThan: case metav1.ResourceVersionMatchNotOlderThan:
return false, consistentRead return delegator.Result{ShouldDelegate: false}, nil
case "": case "":
// Legacy exact match // Legacy exact match
if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" { if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return true, consistentRead return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive)
} }
// Continue // Continue
if len(opts.Predicate.Continue) > 0 { if len(opts.Predicate.Continue) > 0 {
return true, consistentRead return cache.ShouldDelegateContinue(opts.Predicate.Continue, opts.Recursive)
} }
// Consistent Read // Consistent Read
if opts.ResourceVersion == "" { if opts.ResourceVersion == "" {
consistentRead = true return cache.ShouldDelegateConsistentRead()
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
return !consistentListFromCacheEnabled || !requestWatchProgressSupported, consistentRead
} }
return false, consistentRead return delegator.Result{ShouldDelegate: false}, nil
default: default:
return true, consistentRead return delegator.Result{ShouldDelegate: true}, nil
} }
} }

View File

@ -0,0 +1,73 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package delegator
import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)
type Helper interface {
ShouldDelegateExactRV(rv string, recursive bool) (Result, error)
ShouldDelegateContinue(continueToken string, recursive bool) (Result, error)
ShouldDelegateConsistentRead() (Result, error)
}
// Result of delegator decision.
type Result struct {
// Whether a request cannot be served by cache and should be delegated to etcd.
ShouldDelegate bool
// Whether a request is a consistent read, used by delegator to decide if it should call GetCurrentResourceVersion to get RV.
// Included in interface as only cacher has keyPrefix needed to parse continue token.
ConsistentRead bool
}
type CacheWithoutSnapshots struct{}
var _ Helper = CacheWithoutSnapshots{}
func (c CacheWithoutSnapshots) ShouldDelegateContinue(continueToken string, recursive bool) (Result, error) {
return Result{
ShouldDelegate: true,
// Continue with negative RV is considered a consistent read, however token cannot be parsed without keyPrefix unavailable in staging/src/k8s.io/apiserver/pkg/util/flow_control/request/list_work_estimator.go.
ConsistentRead: false,
}, nil
}
func (c CacheWithoutSnapshots) ShouldDelegateExactRV(rv string, recursive bool) (Result, error) {
return Result{
ShouldDelegate: true,
ConsistentRead: false,
}, nil
}
func (c CacheWithoutSnapshots) ShouldDelegateConsistentRead() (Result, error) {
return Result{
ShouldDelegate: !ConsistentReadSupported(),
ConsistentRead: true,
}, nil
}
// ConsistentReadSupported returns whether cache can be used to serve reads with RV not yet observed by cache, including both consistent reads.
// Function is located here to avoid import cycles between staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go and staging/src/k8s.io/apiserver/pkg/util/flow_control/request/list_work_estimator.go.
func ConsistentReadSupported() bool {
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
return consistentListFromCacheEnabled && requestWatchProgressSupported
}

View File

@ -20,8 +20,6 @@ import (
"context" "context"
"testing" "testing"
"k8s.io/apimachinery/pkg/apis/meta/internalversion"
"k8s.io/apimachinery/pkg/apis/meta/internalversion/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/apis/example"
@ -194,56 +192,3 @@ func TestCalculateDigest(t *testing.T) {
}) })
} }
} }
func TestValidateUndelegatedListOptions(t *testing.T) {
opts := []storage.ListOptions{}
keyPrefix := "/pods/"
continueOnRV1, err := storage.EncodeContinue("/pods/a", keyPrefix, 1)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
continueOnNegativeRV, err := storage.EncodeContinue("/pods/a", keyPrefix, -1)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for _, rv := range []string{"", "0", "1"} {
for _, match := range []metav1.ResourceVersionMatch{"", metav1.ResourceVersionMatchExact, metav1.ResourceVersionMatchNotOlderThan} {
for _, c := range []string{"", continueOnRV1, continueOnNegativeRV} {
for _, limit := range []int64{0, 100} {
for _, recursive := range []bool{true, false} {
opt := storage.ListOptions{
ResourceVersion: rv,
ResourceVersionMatch: match,
Predicate: storage.SelectionPredicate{
Limit: limit,
Continue: c,
},
Recursive: recursive,
}
// Skip requests that will not pass apiserver validation
if errs := validation.ValidateListOptions(&internalversion.ListOptions{
ResourceVersion: opt.ResourceVersion,
ResourceVersionMatch: opt.ResourceVersionMatch,
Limit: opt.Predicate.Limit,
Continue: opt.Predicate.Continue,
}, false); len(errs) != 0 {
continue
}
// Skip requests sent directly to etcd
if delegateList, _ := shouldDelegateList(opt); delegateList {
continue
}
opts = append(opts, opt)
}
}
}
}
}
for _, opt := range opts {
_, _, err := storage.ValidateListOptions(keyPrefix, storage.APIObjectVersioner{}, opt)
if err != nil {
t.Errorf("Expected List requests %+v to pass validation, got: %v", opt, err)
}
}
}

View File

@ -32,9 +32,9 @@ import (
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/delegator"
"k8s.io/apiserver/pkg/storage/cacher/metrics" "k8s.io/apiserver/pkg/storage/cacher/metrics"
"k8s.io/apiserver/pkg/storage/cacher/progress" "k8s.io/apiserver/pkg/storage/cacher/progress"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing" "k8s.io/component-base/tracing"
@ -496,8 +496,7 @@ func (s sortableStoreElements) Swap(i, j int) {
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along // WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
// with their ResourceVersion and the name of the index, if any, that was used. // with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, opts storage.ListOptions) (resp listResp, index string, err error) { func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, opts storage.ListOptions) (resp listResp, index string, err error) {
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add() w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion) err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove() w.waitingUntilFresh.Remove()
@ -562,8 +561,7 @@ func (w *watchCache) notFresh(resourceVersion uint64) bool {
// WaitUntilFreshAndGet returns a pointers to <storeElement> object. // WaitUntilFreshAndGet returns a pointers to <storeElement> object.
func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) { func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
var err error var err error
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add() w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion) err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove() w.waitingUntilFresh.Remove()

View File

@ -23,10 +23,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
apirequest "k8s.io/apiserver/pkg/endpoints/request" apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage/cacher/delegator"
"k8s.io/apiserver/pkg/storage"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@ -85,7 +82,12 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
return WorkEstimate{InitialSeats: e.config.MinimumSeats} return WorkEstimate{InitialSeats: e.config.MinimumSeats}
} }
} }
listFromStorage, _ := shouldListFromStorage(&listOptions) // TODO: Check whether watchcache is enabled.
result, err := shouldDelegateList(&listOptions, delegator.CacheWithoutSnapshots{})
if err != nil {
return WorkEstimate{InitialSeats: maxSeats}
}
listFromStorage := result.ShouldDelegate
isListFromCache := requestInfo.Verb == "watch" || !listFromStorage isListFromCache := requestInfo.Verb == "watch" || !listFromStorage
numStored, err := e.countGetterFn(key(requestInfo)) numStored, err := e.countGetterFn(key(requestInfo))
@ -162,32 +164,30 @@ func key(requestInfo *apirequest.RequestInfo) string {
// NOTICE: Keep in sync with shouldDelegateList function in // NOTICE: Keep in sync with shouldDelegateList function in
// //
// staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go // staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go
func shouldListFromStorage(opts *metav1.ListOptions) (shouldDeletage, consistentRead bool) { func shouldDelegateList(opts *metav1.ListOptions, cache delegator.Helper) (delegator.Result, error) {
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
consistentRead = false
switch opts.ResourceVersionMatch { switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchExact: case metav1.ResourceVersionMatchExact:
return true, consistentRead return cache.ShouldDelegateExactRV(opts.ResourceVersion, defaultRecursive)
case metav1.ResourceVersionMatchNotOlderThan: case metav1.ResourceVersionMatchNotOlderThan:
return false, consistentRead return delegator.Result{ShouldDelegate: false}, nil
case "": case "":
// Legacy exact match // Legacy exact match
if opts.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" { if opts.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return true, consistentRead return cache.ShouldDelegateExactRV(opts.ResourceVersion, defaultRecursive)
} }
// Continue // Continue
if len(opts.Continue) > 0 { if len(opts.Continue) > 0 {
return true, consistentRead return cache.ShouldDelegateContinue(opts.Continue, defaultRecursive)
} }
// Consistent Read // Consistent Read
if opts.ResourceVersion == "" { if opts.ResourceVersion == "" {
consistentRead = true return cache.ShouldDelegateConsistentRead()
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
return !consistentListFromCacheEnabled || !requestWatchProgressSupported, consistentRead
} }
return false, consistentRead return delegator.Result{ShouldDelegate: false}, nil
default: default:
return true, consistentRead return delegator.Result{ShouldDelegate: true}, nil
} }
} }
var defaultRecursive = true