mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Merge pull request #130399 from serathius/cache-delegator
Rename CacheProxy to CacheDelegator
This commit is contained in:
commit
6ff0354c15
@ -79,7 +79,7 @@ func StorageWithCacher() generic.StorageDecorator {
|
||||
})
|
||||
}
|
||||
|
||||
return cacherstorage.NewCacheProxy(cacher, s), destroyFunc, nil
|
||||
return cacherstorage.NewCacheDelegator(cacher, s), destroyFunc, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2459,7 +2459,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
|
||||
}
|
||||
}
|
||||
d := destroyFunc
|
||||
s = cacherstorage.NewCacheProxy(cacher, s)
|
||||
s = cacherstorage.NewCacheDelegator(cacher, s)
|
||||
destroyFunc = func() {
|
||||
cacher.Stop()
|
||||
d()
|
||||
|
@ -694,36 +694,6 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
|
||||
return nil
|
||||
}
|
||||
|
||||
// NOTICE: Keep in sync with shouldListFromStorage function in
|
||||
//
|
||||
// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go
|
||||
func shouldDelegateList(opts storage.ListOptions) bool {
|
||||
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
|
||||
switch opts.ResourceVersionMatch {
|
||||
case metav1.ResourceVersionMatchExact:
|
||||
return true
|
||||
case metav1.ResourceVersionMatchNotOlderThan:
|
||||
case "":
|
||||
// Legacy exact match
|
||||
if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
|
||||
return true
|
||||
}
|
||||
default:
|
||||
return true
|
||||
}
|
||||
// Continue
|
||||
if len(opts.Predicate.Continue) > 0 {
|
||||
return true
|
||||
}
|
||||
// Consistent Read
|
||||
if opts.ResourceVersion == "" {
|
||||
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
|
||||
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
||||
return !consistentListFromCacheEnabled || !requestWatchProgressSupported
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// computeListLimit determines whether the cacher should
|
||||
// apply a limit to an incoming LIST request and returns its value.
|
||||
//
|
||||
@ -738,14 +708,6 @@ func computeListLimit(opts storage.ListOptions) int64 {
|
||||
return opts.Predicate.Limit
|
||||
}
|
||||
|
||||
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
|
||||
pred := opts.Predicate
|
||||
noLabelSelector := pred.Label == nil || pred.Label.Empty()
|
||||
noFieldSelector := pred.Field == nil || pred.Field.Empty()
|
||||
hasLimit := pred.Limit > 0
|
||||
return noLabelSelector && noFieldSelector && hasLimit
|
||||
}
|
||||
|
||||
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) (listResp, string, error) {
|
||||
if !recursive {
|
||||
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key)
|
||||
|
@ -455,12 +455,12 @@ func withNodeNameAndNamespaceIndex(options *setupOptions) {
|
||||
}
|
||||
}
|
||||
|
||||
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *CacheProxy, tearDownFunc) {
|
||||
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *CacheDelegator, tearDownFunc) {
|
||||
ctx, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
|
||||
return ctx, cacher, tearDown
|
||||
}
|
||||
|
||||
func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context, *CacheProxy, *etcd3testing.EtcdTestServer, tearDownFunc) {
|
||||
func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context, *CacheDelegator, *etcd3testing.EtcdTestServer, tearDownFunc) {
|
||||
setupOpts := setupOptions{}
|
||||
opts = append([]setupOption{withDefaults}, opts...)
|
||||
for _, opt := range opts {
|
||||
@ -514,7 +514,7 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
|
||||
}
|
||||
}
|
||||
|
||||
return ctx, NewCacheProxy(cacher, wrappedStorage), server, terminate
|
||||
return ctx, NewCacheDelegator(cacher, wrappedStorage), server, terminate
|
||||
}
|
||||
|
||||
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {
|
||||
@ -525,20 +525,20 @@ func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (stora
|
||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||
}
|
||||
}
|
||||
return &createWrapper{CacheProxy: cacher}, tearDown
|
||||
return &createWrapper{CacheDelegator: cacher}, tearDown
|
||||
}
|
||||
|
||||
type createWrapper struct {
|
||||
*CacheProxy
|
||||
*CacheDelegator
|
||||
}
|
||||
|
||||
func (c *createWrapper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
|
||||
if err := c.CacheProxy.Create(ctx, key, obj, out, ttl); err != nil {
|
||||
if err := c.CacheDelegator.Create(ctx, key, obj, out, ttl); err != nil {
|
||||
return err
|
||||
}
|
||||
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
|
||||
currentObj := c.CacheProxy.cacher.newFunc()
|
||||
err := c.CacheProxy.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj)
|
||||
currentObj := c.CacheDelegator.cacher.newFunc()
|
||||
err := c.CacheDelegator.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj)
|
||||
if err != nil {
|
||||
if storage.IsNotFound(err) {
|
||||
return false, nil
|
||||
|
@ -78,7 +78,7 @@ func computePodKey(obj *example.Pod) string {
|
||||
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
|
||||
}
|
||||
|
||||
func compactStorage(c *CacheProxy, client *clientv3.Client) storagetesting.Compaction {
|
||||
func compactStorage(c *CacheDelegator, client *clientv3.Client) storagetesting.Compaction {
|
||||
return func(ctx context.Context, t *testing.T, resourceVersion string) {
|
||||
versioner := storage.APIObjectVersioner{}
|
||||
rv, err := versioner.ParseResourceVersion(resourceVersion)
|
||||
|
@ -317,7 +317,7 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
proxy := NewCacheProxy(cacher, backingStorage)
|
||||
delegator := NewCacheDelegator(cacher, backingStorage)
|
||||
result := &example.PodList{}
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||
@ -342,7 +342,7 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
|
||||
return nil
|
||||
}
|
||||
}
|
||||
err = proxy.GetList(context.TODO(), "pods/ns", options, result)
|
||||
err = delegator.GetList(context.TODO(), "pods/ns", options, result)
|
||||
gotBypass := errors.Is(err, errDummy)
|
||||
if err != nil && !gotBypass {
|
||||
t.Fatalf("Unexpected error for List request with options: %v, err: %v", options, err)
|
||||
@ -441,7 +441,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
proxy := NewCacheProxy(cacher, backingStorage)
|
||||
delegator := NewCacheDelegator(cacher, backingStorage)
|
||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||
}
|
||||
@ -484,7 +484,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
|
||||
}
|
||||
|
||||
start := cacher.clock.Now()
|
||||
err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result)
|
||||
err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result)
|
||||
clockStepCancelFn()
|
||||
duration := cacher.clock.Since(start)
|
||||
if (err != nil) != tc.expectError {
|
||||
@ -516,7 +516,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
proxy := NewCacheProxy(cacher, backingStorage)
|
||||
delegator := NewCacheDelegator(cacher, backingStorage)
|
||||
|
||||
pred := storage.SelectionPredicate{
|
||||
Limit: 500,
|
||||
@ -531,7 +531,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
|
||||
|
||||
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||
backingStorage.injectError(errDummy)
|
||||
err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{
|
||||
err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
Predicate: pred,
|
||||
}, result)
|
||||
@ -539,7 +539,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
|
||||
t.Errorf("GetList with Limit and RV=0 should be served from cache: %v", err)
|
||||
}
|
||||
|
||||
err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{
|
||||
err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{
|
||||
ResourceVersion: "",
|
||||
Predicate: pred,
|
||||
}, result)
|
||||
@ -555,7 +555,7 @@ func TestGetCacheBypass(t *testing.T) {
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
proxy := NewCacheProxy(cacher, backingStorage)
|
||||
delegator := NewCacheDelegator(cacher, backingStorage)
|
||||
|
||||
result := &example.Pod{}
|
||||
|
||||
@ -567,7 +567,7 @@ func TestGetCacheBypass(t *testing.T) {
|
||||
|
||||
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||
backingStorage.injectError(errDummy)
|
||||
err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
|
||||
err = delegator.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
|
||||
IgnoreNotFound: true,
|
||||
ResourceVersion: "0",
|
||||
}, result)
|
||||
@ -575,7 +575,7 @@ func TestGetCacheBypass(t *testing.T) {
|
||||
t.Errorf("Get with RV=0 should be served from cache: %v", err)
|
||||
}
|
||||
|
||||
err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
|
||||
err = delegator.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
|
||||
IgnoreNotFound: true,
|
||||
ResourceVersion: "",
|
||||
}, result)
|
||||
@ -591,7 +591,7 @@ func TestWatchCacheBypass(t *testing.T) {
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
proxy := NewCacheProxy(cacher, backingStorage)
|
||||
delegator := NewCacheDelegator(cacher, backingStorage)
|
||||
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||
@ -599,7 +599,7 @@ func TestWatchCacheBypass(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
_, err = proxy.Watch(context.TODO(), "pod/ns", storage.ListOptions{
|
||||
_, err = delegator.Watch(context.TODO(), "pod/ns", storage.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
Predicate: storage.Everything,
|
||||
})
|
||||
@ -607,7 +607,7 @@ func TestWatchCacheBypass(t *testing.T) {
|
||||
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
|
||||
}
|
||||
|
||||
_, err = proxy.Watch(context.TODO(), "pod/ns", storage.ListOptions{
|
||||
_, err = delegator.Watch(context.TODO(), "pod/ns", storage.ListOptions{
|
||||
ResourceVersion: "",
|
||||
Predicate: storage.Everything,
|
||||
})
|
||||
@ -628,7 +628,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) {
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
proxy := NewCacheProxy(cacher, backingStorage)
|
||||
delegator := NewCacheDelegator(cacher, backingStorage)
|
||||
|
||||
opts := storage.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
@ -640,7 +640,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) {
|
||||
defer listCancel()
|
||||
|
||||
result := &example.PodList{}
|
||||
err = proxy.GetList(listCtx, "/pods/ns", opts, result)
|
||||
err = delegator.GetList(listCtx, "/pods/ns", opts, result)
|
||||
if err != nil && apierrors.IsTooManyRequests(err) {
|
||||
t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for List")
|
||||
}
|
||||
@ -648,7 +648,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) {
|
||||
watchCtx, watchCancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
|
||||
defer watchCancel()
|
||||
|
||||
_, err = proxy.Watch(watchCtx, "/pods/ns", opts)
|
||||
_, err = delegator.Watch(watchCtx, "/pods/ns", opts)
|
||||
if err != nil && apierrors.IsTooManyRequests(err) {
|
||||
t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for Watch")
|
||||
}
|
||||
@ -873,7 +873,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
proxy := NewCacheProxy(cacher, backingStorage)
|
||||
delegator := NewCacheDelegator(cacher, backingStorage)
|
||||
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||
@ -881,7 +881,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
w, err := proxy.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||
w, err := delegator.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create watch: %v", err)
|
||||
}
|
||||
@ -901,13 +901,13 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
|
||||
|
||||
cacher.Stop()
|
||||
|
||||
_, err = proxy.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||
_, err = delegator.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||
if err == nil {
|
||||
t.Fatalf("Success to create Watch: %v", err)
|
||||
}
|
||||
|
||||
result := &example.Pod{}
|
||||
err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
|
||||
err = delegator.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
|
||||
IgnoreNotFound: true,
|
||||
ResourceVersion: "1",
|
||||
}, result)
|
||||
@ -922,7 +922,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
|
||||
}
|
||||
|
||||
listResult := &example.PodList{}
|
||||
err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{
|
||||
err = delegator.GetList(context.TODO(), "pods/ns", storage.ListOptions{
|
||||
ResourceVersion: "1",
|
||||
Recursive: true,
|
||||
Predicate: storage.SelectionPredicate{
|
||||
@ -2291,7 +2291,7 @@ func BenchmarkCacher_GetList(b *testing.B) {
|
||||
b.Fatalf("new cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
proxy := NewCacheProxy(cacher, store)
|
||||
delegator := NewCacheDelegator(cacher, store)
|
||||
|
||||
// prepare result and pred
|
||||
parsedField, err := fields.ParseSelector("spec.nodeName=node-0")
|
||||
@ -2307,7 +2307,7 @@ func BenchmarkCacher_GetList(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
result := &example.PodList{}
|
||||
err = proxy.GetList(context.TODO(), "pods", storage.ListOptions{
|
||||
err = delegator.GetList(context.TODO(), "pods", storage.ListOptions{
|
||||
Predicate: pred,
|
||||
Recursive: true,
|
||||
ResourceVersion: "12345",
|
||||
@ -3179,8 +3179,8 @@ func TestRetryAfterForUnreadyCache(t *testing.T) {
|
||||
Predicate: storage.Everything,
|
||||
}
|
||||
result := &example.PodList{}
|
||||
proxy := NewCacheProxy(cacher, backingStorage)
|
||||
err = proxy.GetList(context.TODO(), "/pods/ns", opts, result)
|
||||
delegator := NewCacheDelegator(cacher, backingStorage)
|
||||
err = delegator.GetList(context.TODO(), "/pods/ns", opts, result)
|
||||
|
||||
if !apierrors.IsTooManyRequests(err) {
|
||||
t.Fatalf("Unexpected GetList error: %v", err)
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/audit"
|
||||
@ -34,29 +35,29 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
func NewCacheProxy(cacher *Cacher, storage storage.Interface) *CacheProxy {
|
||||
return &CacheProxy{
|
||||
func NewCacheDelegator(cacher *Cacher, storage storage.Interface) *CacheDelegator {
|
||||
return &CacheDelegator{
|
||||
cacher: cacher,
|
||||
storage: storage,
|
||||
}
|
||||
}
|
||||
|
||||
type CacheProxy struct {
|
||||
type CacheDelegator struct {
|
||||
cacher *Cacher
|
||||
storage storage.Interface
|
||||
}
|
||||
|
||||
var _ storage.Interface = (*CacheProxy)(nil)
|
||||
var _ storage.Interface = (*CacheDelegator)(nil)
|
||||
|
||||
func (c *CacheProxy) Versioner() storage.Versioner {
|
||||
func (c *CacheDelegator) Versioner() storage.Versioner {
|
||||
return c.storage.Versioner()
|
||||
}
|
||||
|
||||
func (c *CacheProxy) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
|
||||
func (c *CacheDelegator) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
|
||||
return c.storage.Create(ctx, key, obj, out, ttl)
|
||||
}
|
||||
|
||||
func (c *CacheProxy) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error {
|
||||
func (c *CacheDelegator) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error {
|
||||
// Ignore the suggestion and try to pass down the current version of the object
|
||||
// read from cache.
|
||||
if elem, exists, err := c.cacher.watchCache.GetByKey(key); err != nil {
|
||||
@ -71,7 +72,7 @@ func (c *CacheProxy) Delete(ctx context.Context, key string, out runtime.Object,
|
||||
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil, opts)
|
||||
}
|
||||
|
||||
func (c *CacheProxy) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
||||
func (c *CacheDelegator) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
||||
// if the watch-list feature wasn't set and the resourceVersion is unset
|
||||
// ensure that the rv from which the watch is being served, is the latest
|
||||
// one. "latest" is ensured by serving the watch from
|
||||
@ -89,7 +90,7 @@ func (c *CacheProxy) Watch(ctx context.Context, key string, opts storage.ListOpt
|
||||
return c.cacher.Watch(ctx, key, opts)
|
||||
}
|
||||
|
||||
func (c *CacheProxy) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
|
||||
func (c *CacheDelegator) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
|
||||
ctx, span := tracing.Start(ctx, "cacher.Get",
|
||||
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
|
||||
attribute.String("key", key),
|
||||
@ -104,7 +105,7 @@ func (c *CacheProxy) Get(ctx context.Context, key string, opts storage.GetOption
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||
if !c.cacher.Ready() {
|
||||
// If Cache is not initialized, delegate Get requests to storage
|
||||
// If Cache is not initialized, delegator Get requests to storage
|
||||
// as described in https://kep.k8s.io/4568
|
||||
span.AddEvent("About to Get from underlying storage - cache not initialized")
|
||||
return c.storage.Get(ctx, key, opts, objPtr)
|
||||
@ -133,7 +134,7 @@ func (c *CacheProxy) Get(ctx context.Context, key string, opts storage.GetOption
|
||||
return c.cacher.Get(ctx, key, opts, objPtr)
|
||||
}
|
||||
|
||||
func (c *CacheProxy) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
if shouldDelegateList(opts) {
|
||||
return c.storage.GetList(ctx, key, opts, listObj)
|
||||
}
|
||||
@ -145,7 +146,7 @@ func (c *CacheProxy) GetList(ctx context.Context, key string, opts storage.ListO
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||
if !c.cacher.Ready() && shouldDelegateListOnNotReadyCache(opts) {
|
||||
// If Cacher is not initialized, delegate List requests to storage
|
||||
// If Cacher is not initialized, delegator List requests to storage
|
||||
// as described in https://kep.k8s.io/4568
|
||||
return c.storage.GetList(ctx, key, opts, listObj)
|
||||
}
|
||||
@ -186,7 +187,45 @@ func (c *CacheProxy) GetList(ctx context.Context, key string, opts storage.ListO
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *CacheProxy) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
|
||||
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
|
||||
pred := opts.Predicate
|
||||
noLabelSelector := pred.Label == nil || pred.Label.Empty()
|
||||
noFieldSelector := pred.Field == nil || pred.Field.Empty()
|
||||
hasLimit := pred.Limit > 0
|
||||
return noLabelSelector && noFieldSelector && hasLimit
|
||||
}
|
||||
|
||||
// NOTICE: Keep in sync with shouldListFromStorage function in
|
||||
//
|
||||
// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go
|
||||
func shouldDelegateList(opts storage.ListOptions) bool {
|
||||
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
|
||||
switch opts.ResourceVersionMatch {
|
||||
case metav1.ResourceVersionMatchExact:
|
||||
return true
|
||||
case metav1.ResourceVersionMatchNotOlderThan:
|
||||
case "":
|
||||
// Legacy exact match
|
||||
if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
|
||||
return true
|
||||
}
|
||||
default:
|
||||
return true
|
||||
}
|
||||
// Continue
|
||||
if len(opts.Predicate.Continue) > 0 {
|
||||
return true
|
||||
}
|
||||
// Consistent Read
|
||||
if opts.ResourceVersion == "" {
|
||||
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
|
||||
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
||||
return !consistentListFromCacheEnabled || !requestWatchProgressSupported
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *CacheDelegator) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
|
||||
// Ignore the suggestion and try to pass down the current version of the object
|
||||
// read from cache.
|
||||
if elem, exists, err := c.cacher.watchCache.GetByKey(key); err != nil {
|
||||
@ -201,17 +240,17 @@ func (c *CacheProxy) GuaranteedUpdate(ctx context.Context, key string, destinati
|
||||
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil)
|
||||
}
|
||||
|
||||
func (c *CacheProxy) Count(pathPrefix string) (int64, error) {
|
||||
func (c *CacheDelegator) Count(pathPrefix string) (int64, error) {
|
||||
return c.storage.Count(pathPrefix)
|
||||
}
|
||||
|
||||
func (c *CacheProxy) ReadinessCheck() error {
|
||||
func (c *CacheDelegator) ReadinessCheck() error {
|
||||
if !c.cacher.Ready() {
|
||||
return storage.ErrStorageNotReady
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *CacheProxy) RequestWatchProgress(ctx context.Context) error {
|
||||
func (c *CacheDelegator) RequestWatchProgress(ctx context.Context) error {
|
||||
return c.storage.RequestWatchProgress(ctx)
|
||||
}
|
@ -162,7 +162,7 @@ func key(requestInfo *apirequest.RequestInfo) string {
|
||||
|
||||
// NOTICE: Keep in sync with shouldDelegateList function in
|
||||
//
|
||||
// staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
|
||||
// staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go
|
||||
func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool {
|
||||
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
|
||||
switch opts.ResourceVersionMatch {
|
||||
|
Loading…
Reference in New Issue
Block a user