diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go index 63fb20bcb86..a3fa29f1a19 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go @@ -79,7 +79,7 @@ func StorageWithCacher() generic.StorageDecorator { }) } - return cacher, destroyFunc, nil + return cacherstorage.NewCacheProxy(cacher, s), destroyFunc, nil } } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index 4127342e3d0..96cfe4a3870 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -2459,7 +2459,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE } } d := destroyFunc - s = cacher + s = cacherstorage.NewCacheProxy(cacher, s) destroyFunc = func() { cacher.Stop() d() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 970da6616a1..ec9cc0dcf7d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -339,10 +339,6 @@ type Cacher struct { expiredBookmarkWatchers []*cacheWatcher } -func (c *Cacher) RequestWatchProgress(ctx context.Context) error { - return c.storage.RequestWatchProgress(ctx) -} - // NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from // its internal cache and updating its cache in the background based on the // given configuration. @@ -495,56 +491,13 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { } } -// Versioner implements storage.Interface. -func (c *Cacher) Versioner() storage.Versioner { - return c.storage.Versioner() -} - -// Create implements storage.Interface. -func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { - return c.storage.Create(ctx, key, obj, out, ttl) -} - -// Delete implements storage.Interface. -func (c *Cacher) Delete( - ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, - validateDeletion storage.ValidateObjectFunc, _ runtime.Object, opts storage.DeleteOptions) error { - // Ignore the suggestion and try to pass down the current version of the object - // read from cache. - if elem, exists, err := c.watchCache.GetByKey(key); err != nil { - klog.Errorf("GetByKey returned error: %v", err) - } else if exists { - // DeepCopy the object since we modify resource version when serializing the - // current object. - currObj := elem.(*storeElement).Object.DeepCopyObject() - return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj, opts) - } - // If we couldn't get the object, fallback to no-suggestion. - return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil, opts) -} - type namespacedName struct { namespace string name string } -// Watch implements storage.Interface. func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { pred := opts.Predicate - // if the watch-list feature wasn't set and the resourceVersion is unset - // ensure that the rv from which the watch is being served, is the latest - // one. "latest" is ensured by serving the watch from - // the underlying storage. - // - // it should never happen due to our validation but let's just be super-safe here - // and disable sendingInitialEvents when the feature wasn't enabled - if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil { - opts.SendInitialEvents = nil - } - // TODO: we should eventually get rid of this legacy case - if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" { - return c.storage.Watch(ctx, key, opts) - } requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion) if err != nil { return nil, err @@ -709,58 +662,17 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return watcher, nil } -// Get implements storage.Interface. func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { - ctx, span := tracing.Start(ctx, "cacher.Get", - attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)), - attribute.String("key", key), - attribute.String("resource-version", opts.ResourceVersion)) - defer span.End(500 * time.Millisecond) - if opts.ResourceVersion == "" { - // If resourceVersion is not specified, serve it from underlying - // storage (for backward compatibility). - span.AddEvent("About to Get from underlying storage") - return c.storage.Get(ctx, key, opts, objPtr) - } - - if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { - if !c.ready.check() { - // If Cache is not initialized, delegate Get requests to storage - // as described in https://kep.k8s.io/4568 - span.AddEvent("About to Get from underlying storage - cache not initialized") - return c.storage.Get(ctx, key, opts, objPtr) - } - } - - // If resourceVersion is specified, serve it from cache. - // It's guaranteed that the returned value is at least that - // fresh as the given resourceVersion. getRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion) if err != nil { return err } - // Do not create a trace - it's not for free and there are tons - // of Get requests. We can add it if it will be really needed. - - if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { - if getRV == 0 && !c.ready.check() { - // If Cacher is not yet initialized and we don't require any specific - // minimal resource version, simply forward the request to storage. - span.AddEvent("About to Get from underlying storage - cache not initialized and no resourceVersion set") - return c.storage.Get(ctx, key, opts, objPtr) - } - if err := c.ready.wait(ctx); err != nil { - return errors.NewServiceUnavailable(err.Error()) - } - } - objVal, err := conversion.EnforcePtr(objPtr) if err != nil { return err } - span.AddEvent("About to fetch object from cache") obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, getRV, key) if err != nil { return err @@ -853,32 +765,9 @@ type listResp struct { } // GetList implements storage.Interface -func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { +func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, listRV uint64) error { recursive := opts.Recursive - resourceVersion := opts.ResourceVersion pred := opts.Predicate - if shouldDelegateList(opts) { - return c.storage.GetList(ctx, key, opts, listObj) - } - - listRV, err := c.versioner.ParseResourceVersion(resourceVersion) - if err != nil { - return err - } - - if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { - if !c.ready.check() && shouldDelegateListOnNotReadyCache(opts) { - // If Cacher is not initialized, delegate List requests to storage - // as described in https://kep.k8s.io/4568 - return c.storage.GetList(ctx, key, opts, listObj) - } - } else { - if listRV == 0 && !c.ready.check() { - // If Cacher is not yet initialized and we don't require any specific - // minimal resource version, simply forward the request to storage. - return c.storage.GetList(ctx, key, opts, listObj) - } - } // For recursive lists, we need to make sure the key ended with "/" so that we only // get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys // with prefix "/a" will return all three, while with prefix "/a/" will return only @@ -887,14 +776,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio if opts.Recursive && !strings.HasSuffix(key, "/") { preparedKey += "/" } - requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) - consistentRead := resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported - if consistentRead { - listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String()) - if err != nil { - return err - } - } ctx, span := tracing.Start(ctx, "cacher.GetList", attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)), @@ -928,24 +809,9 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio } resp, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive) - success := "true" - fallback := "false" if err != nil { - if consistentRead { - if storage.IsTooLargeResourceVersion(err) { - fallback = "true" - err = c.storage.GetList(ctx, key, opts, listObj) - } - if err != nil { - success = "false" - } - metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1) - } return err } - if consistentRead { - metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1) - } span.AddEvent("Listed items from cache", attribute.Int("count", len(resp.Items))) // store pointer of eligible objects, // Why not directly put object in the items of listObj? @@ -995,37 +861,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio return nil } -// GuaranteedUpdate implements storage.Interface. -func (c *Cacher) GuaranteedUpdate( - ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, - preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ runtime.Object) error { - // Ignore the suggestion and try to pass down the current version of the object - // read from cache. - if elem, exists, err := c.watchCache.GetByKey(key); err != nil { - klog.Errorf("GetByKey returned error: %v", err) - } else if exists { - // DeepCopy the object since we modify resource version when serializing the - // current object. - currObj := elem.(*storeElement).Object.DeepCopyObject() - return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, currObj) - } - // If we couldn't get the object, fallback to no-suggestion. - return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil) -} - -// Count implements storage.Interface. -func (c *Cacher) Count(pathPrefix string) (int64, error) { - return c.storage.Count(pathPrefix) -} - -// ReadinessCheck implements storage.Interface. -func (c *Cacher) ReadinessCheck() error { - if !c.ready.check() { - return storage.ErrStorageNotReady - } - return nil -} - // baseObjectThreadUnsafe omits locking for cachingObject. func baseObjectThreadUnsafe(object runtime.Object) runtime.Object { if co, ok := object.(*cachingObject); ok { @@ -1502,6 +1337,10 @@ func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCach } } +func (c *Cacher) Ready() bool { + return c.ready.check() +} + // errWatcher implements watch.Interface to return a single error type errWatcher struct { result chan watch.Event diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index 8b98016db7f..57b857a21e4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -455,12 +455,12 @@ func withNodeNameAndNamespaceIndex(options *setupOptions) { } } -func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tearDownFunc) { +func testSetup(t *testing.T, opts ...setupOption) (context.Context, *CacheProxy, tearDownFunc) { ctx, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...) return ctx, cacher, tearDown } -func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context, *Cacher, *etcd3testing.EtcdTestServer, tearDownFunc) { +func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context, *CacheProxy, *etcd3testing.EtcdTestServer, tearDownFunc) { setupOpts := setupOptions{} opts = append([]setupOption{withDefaults}, opts...) for _, opt := range opts { @@ -514,31 +514,31 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context } } - return ctx, cacher, server, terminate + return ctx, NewCacheProxy(cacher, wrappedStorage), server, terminate } func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) { _, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...) if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { - if err := cacher.ready.wait(context.TODO()); err != nil { + if err := cacher.cacher.ready.wait(context.TODO()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } } - return &createWrapper{Cacher: cacher}, tearDown + return &createWrapper{CacheProxy: cacher}, tearDown } type createWrapper struct { - *Cacher + *CacheProxy } func (c *createWrapper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { - if err := c.Cacher.Create(ctx, key, obj, out, ttl); err != nil { + if err := c.CacheProxy.Create(ctx, key, obj, out, ttl); err != nil { return err } return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { - currentObj := c.Cacher.newFunc() - err := c.Cacher.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj) + currentObj := c.CacheProxy.cacher.newFunc() + err := c.CacheProxy.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj) if err != nil { if storage.IsNotFound(err) { return false, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go index a7f68b543ed..dae46e291a4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go @@ -78,7 +78,7 @@ func computePodKey(obj *example.Pod) string { return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name) } -func compactStorage(c *Cacher, client *clientv3.Client) storagetesting.Compaction { +func compactStorage(c *CacheProxy, client *clientv3.Client) storagetesting.Compaction { return func(ctx context.Context, t *testing.T, resourceVersion string) { versioner := storage.APIObjectVersioner{} rv, err := versioner.ParseResourceVersion(resourceVersion) @@ -86,37 +86,37 @@ func compactStorage(c *Cacher, client *clientv3.Client) storagetesting.Compactio t.Fatal(err) } - err = c.watchCache.waitUntilFreshAndBlock(context.TODO(), rv) + err = c.cacher.watchCache.waitUntilFreshAndBlock(context.TODO(), rv) if err != nil { t.Fatalf("WatchCache didn't caught up to RV: %v", rv) } - c.watchCache.RUnlock() + c.cacher.watchCache.RUnlock() - c.watchCache.Lock() - defer c.watchCache.Unlock() - c.Lock() - defer c.Unlock() + c.cacher.watchCache.Lock() + defer c.cacher.watchCache.Unlock() + c.cacher.Lock() + defer c.cacher.Unlock() - if c.watchCache.resourceVersion < rv { + if c.cacher.watchCache.resourceVersion < rv { t.Fatalf("Can't compact into a future version: %v", resourceVersion) } - if len(c.watchers.allWatchers) > 0 || len(c.watchers.valueWatchers) > 0 { + if len(c.cacher.watchers.allWatchers) > 0 || len(c.cacher.watchers.valueWatchers) > 0 { // We could consider terminating those watchers, but given // watchcache doesn't really support compaction and we don't // exercise it in tests, we just throw an error here. t.Error("Open watchers are not supported during compaction") } - for c.watchCache.startIndex < c.watchCache.endIndex { - index := c.watchCache.startIndex % c.watchCache.capacity - if c.watchCache.cache[index].ResourceVersion > rv { + for c.cacher.watchCache.startIndex < c.cacher.watchCache.endIndex { + index := c.cacher.watchCache.startIndex % c.cacher.watchCache.capacity + if c.cacher.watchCache.cache[index].ResourceVersion > rv { break } - c.watchCache.startIndex++ + c.cacher.watchCache.startIndex++ } - c.watchCache.listResourceVersion = rv + c.cacher.watchCache.listResourceVersion = rv if _, err = client.KV.Put(ctx, "compact_rev_key", resourceVersion); err != nil { t.Fatalf("Could not update compact_rev_key: %v", err) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index b6ea86fecdc..f542d44f6bf 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -310,15 +310,13 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp t.Fatalf("Couldn't create cacher: %v", err) } defer cacher.Stop() - + proxy := NewCacheProxy(cacher, backingStorage) result := &example.PodList{} - if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } } - // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { currentResourceVersion := "42" @@ -337,11 +335,11 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp return nil } } - err = cacher.GetList(context.TODO(), "pods/ns", options, result) - if err != nil && err != errDummy { + err = proxy.GetList(context.TODO(), "pods/ns", options, result) + gotBypass := errors.Is(err, errDummy) + if err != nil && !gotBypass { t.Fatalf("Unexpected error for List request with options: %v, err: %v", options, err) } - gotBypass := err == errDummy if gotBypass != expectBypass { t.Errorf("Unexpected bypass result for List request with options %+v, bypass expected: %v, got: %v", options, expectBypass, gotBypass) } @@ -436,6 +434,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su t.Fatalf("Couldn't create cacher: %v", err) } defer cacher.Stop() + proxy := NewCacheProxy(cacher, backingStorage) if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") @@ -461,7 +460,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su } result := &example.PodList{} start := cacher.clock.Now() - err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result) + err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result) duration := cacher.clock.Since(start) if (err != nil) != tc.expectError { t.Fatalf("Unexpected error err: %v", err) @@ -492,6 +491,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) { t.Fatalf("Couldn't create cacher: %v", err) } defer cacher.Stop() + proxy := NewCacheProxy(cacher, backingStorage) pred := storage.SelectionPredicate{ Limit: 500, @@ -506,7 +506,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) { // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.injectError(errDummy) - err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ + err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{ ResourceVersion: "0", Predicate: pred, }, result) @@ -514,11 +514,11 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) { t.Errorf("GetList with Limit and RV=0 should be served from cache: %v", err) } - err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ + err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{ ResourceVersion: "", Predicate: pred, }, result) - if err != errDummy { + if !errors.Is(err, errDummy) { t.Errorf("GetList with Limit without RV=0 should bypass cacher: %v", err) } } @@ -530,6 +530,7 @@ func TestGetCacheBypass(t *testing.T) { t.Fatalf("Couldn't create cacher: %v", err) } defer cacher.Stop() + proxy := NewCacheProxy(cacher, backingStorage) result := &example.Pod{} @@ -541,7 +542,7 @@ func TestGetCacheBypass(t *testing.T) { // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.injectError(errDummy) - err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ + err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ IgnoreNotFound: true, ResourceVersion: "0", }, result) @@ -549,11 +550,11 @@ func TestGetCacheBypass(t *testing.T) { t.Errorf("Get with RV=0 should be served from cache: %v", err) } - err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ + err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ IgnoreNotFound: true, ResourceVersion: "", }, result) - if err != errDummy { + if !errors.Is(err, errDummy) { t.Errorf("Get without RV=0 should bypass cacher: %v", err) } } @@ -565,6 +566,7 @@ func TestWatchCacheBypass(t *testing.T) { t.Fatalf("Couldn't create cacher: %v", err) } defer cacher.Stop() + proxy := NewCacheProxy(cacher, backingStorage) if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { if err := cacher.ready.wait(context.Background()); err != nil { @@ -572,7 +574,7 @@ func TestWatchCacheBypass(t *testing.T) { } } - _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ + _, err = proxy.Watch(context.TODO(), "pod/ns", storage.ListOptions{ ResourceVersion: "0", Predicate: storage.Everything, }) @@ -580,7 +582,7 @@ func TestWatchCacheBypass(t *testing.T) { t.Errorf("Watch with RV=0 should be served from cache: %v", err) } - _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ + _, err = proxy.Watch(context.TODO(), "pod/ns", storage.ListOptions{ ResourceVersion: "", Predicate: storage.Everything, }) @@ -589,7 +591,7 @@ func TestWatchCacheBypass(t *testing.T) { } featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false) - _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ + _, err = proxy.Watch(context.TODO(), "pod/ns", storage.ListOptions{ ResourceVersion: "", Predicate: storage.Everything, }) @@ -600,7 +602,7 @@ func TestWatchCacheBypass(t *testing.T) { // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.injectError(errDummy) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true) - _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ + _, err = proxy.Watch(context.TODO(), "pod/ns", storage.ListOptions{ ResourceVersion: "", Predicate: storage.Everything, }) @@ -621,6 +623,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) { t.Fatalf("Couldn't create cacher: %v", err) } defer cacher.Stop() + proxy := NewCacheProxy(cacher, backingStorage) opts := storage.ListOptions{ ResourceVersion: "0", @@ -632,7 +635,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) { defer listCancel() result := &example.PodList{} - err = cacher.GetList(listCtx, "/pods/ns", opts, result) + err = proxy.GetList(listCtx, "/pods/ns", opts, result) if err != nil && apierrors.IsTooManyRequests(err) { t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for List") } @@ -640,7 +643,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) { watchCtx, watchCancel := context.WithTimeout(context.Background(), 250*time.Millisecond) defer watchCancel() - _, err = cacher.Watch(watchCtx, "/pods/ns", opts) + _, err = proxy.Watch(watchCtx, "/pods/ns", opts) if err != nil && apierrors.IsTooManyRequests(err) { t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for Watch") } @@ -865,6 +868,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) { if err != nil { t.Fatalf("Couldn't create cacher: %v", err) } + proxy := NewCacheProxy(cacher, backingStorage) if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { if err := cacher.ready.wait(context.Background()); err != nil { @@ -872,7 +876,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) { } } - w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + w, err := proxy.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Failed to create watch: %v", err) } @@ -892,13 +896,13 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) { cacher.Stop() - _, err = cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + _, err = proxy.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err == nil { t.Fatalf("Success to create Watch: %v", err) } result := &example.Pod{} - err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ + err = proxy.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ IgnoreNotFound: true, ResourceVersion: "1", }, result) @@ -913,7 +917,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) { } listResult := &example.PodList{} - err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ + err = proxy.GetList(context.TODO(), "pods/ns", storage.ListOptions{ ResourceVersion: "1", Recursive: true, Predicate: storage.SelectionPredicate{ @@ -2274,11 +2278,13 @@ func BenchmarkCacher_GetList(b *testing.B) { } // build test cacher - cacher, _, err := newTestCacher(newObjectStorage(fakePods)) + store := newObjectStorage(fakePods) + cacher, _, err := newTestCacher(store) if err != nil { b.Fatalf("new cacher: %v", err) } defer cacher.Stop() + proxy := NewCacheProxy(cacher, store) // prepare result and pred parsedField, err := fields.ParseSelector("spec.nodeName=node-0") @@ -2294,7 +2300,7 @@ func BenchmarkCacher_GetList(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { result := &example.PodList{} - err = cacher.GetList(context.TODO(), "pods", storage.ListOptions{ + err = proxy.GetList(context.TODO(), "pods", storage.ListOptions{ Predicate: pred, Recursive: true, ResourceVersion: "12345", @@ -2847,11 +2853,11 @@ func TestWatchStreamSeparation(t *testing.T) { waitForEtcdBookmark := watchAndWaitForBookmark(t, waitContext, cacher.storage) var out example.Pod - err = cacher.Create(context.Background(), "foo", &example.Pod{}, &out, 0) + err = cacher.storage.Create(context.Background(), "foo", &example.Pod{}, &out, 0) if err != nil { t.Fatal(err) } - err = cacher.Delete(context.Background(), "foo", &out, nil, storage.ValidateAllObjectFunc, &example.Pod{}, storage.DeleteOptions{}) + err = cacher.storage.Delete(context.Background(), "foo", &out, nil, storage.ValidateAllObjectFunc, &example.Pod{}, storage.DeleteOptions{}) if err != nil { t.Fatal(err) } @@ -3132,7 +3138,7 @@ func TestListIndexer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { pred := storagetesting.CreatePodPredicate(tt.fieldSelector, true, tt.indexFields) - _, usedIndex, err := cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, pred, tt.recursive) + _, usedIndex, err := cacher.cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, pred, tt.recursive) if err != nil { t.Errorf("Unexpected error: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/proxy.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/proxy.go new file mode 100644 index 00000000000..b49544a71b2 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/proxy.go @@ -0,0 +1,217 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/audit" + "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/cacher/metrics" + etcdfeature "k8s.io/apiserver/pkg/storage/feature" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/component-base/tracing" + "k8s.io/klog/v2" +) + +func NewCacheProxy(cacher *Cacher, storage storage.Interface) *CacheProxy { + return &CacheProxy{ + cacher: cacher, + storage: storage, + } +} + +type CacheProxy struct { + cacher *Cacher + storage storage.Interface +} + +var _ storage.Interface = (*CacheProxy)(nil) + +func (c *CacheProxy) Versioner() storage.Versioner { + return c.storage.Versioner() +} + +func (c *CacheProxy) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { + return c.storage.Create(ctx, key, obj, out, ttl) +} + +func (c *CacheProxy) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error { + // Ignore the suggestion and try to pass down the current version of the object + // read from cache. + if elem, exists, err := c.cacher.watchCache.GetByKey(key); err != nil { + klog.Errorf("GetByKey returned error: %v", err) + } else if exists { + // DeepCopy the object since we modify resource version when serializing the + // current object. + currObj := elem.(*storeElement).Object.DeepCopyObject() + return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj, opts) + } + // If we couldn't get the object, fallback to no-suggestion. + return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil, opts) +} + +func (c *CacheProxy) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + // if the watch-list feature wasn't set and the resourceVersion is unset + // ensure that the rv from which the watch is being served, is the latest + // one. "latest" is ensured by serving the watch from + // the underlying storage. + // + // it should never happen due to our validation but let's just be super-safe here + // and disable sendingInitialEvents when the feature wasn't enabled + if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil { + opts.SendInitialEvents = nil + } + // TODO: we should eventually get rid of this legacy case + if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" { + return c.storage.Watch(ctx, key, opts) + } + return c.cacher.Watch(ctx, key, opts) +} + +func (c *CacheProxy) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { + ctx, span := tracing.Start(ctx, "cacher.Get", + attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)), + attribute.String("key", key), + attribute.String("resource-version", opts.ResourceVersion)) + defer span.End(500 * time.Millisecond) + if opts.ResourceVersion == "" { + // If resourceVersion is not specified, serve it from underlying + // storage (for backward compatibility). + span.AddEvent("About to Get from underlying storage") + return c.storage.Get(ctx, key, opts, objPtr) + } + + if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if !c.cacher.Ready() { + // If Cache is not initialized, delegate Get requests to storage + // as described in https://kep.k8s.io/4568 + span.AddEvent("About to Get from underlying storage - cache not initialized") + return c.storage.Get(ctx, key, opts, objPtr) + } + } + // If resourceVersion is specified, serve it from cache. + // It's guaranteed that the returned value is at least that + // fresh as the given resourceVersion. + getRV, err := c.cacher.versioner.ParseResourceVersion(opts.ResourceVersion) + if err != nil { + return err + } + // Do not create a trace - it's not for free and there are tons + // of Get requests. We can add it if it will be really needed. + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if getRV == 0 && !c.cacher.Ready() { + // If Cacher is not yet initialized and we don't require any specific + // minimal resource version, simply forward the request to storage. + return c.storage.Get(ctx, key, opts, objPtr) + } + if err := c.cacher.ready.wait(ctx); err != nil { + return errors.NewServiceUnavailable(err.Error()) + } + } + span.AddEvent("About to fetch object from cache") + return c.cacher.Get(ctx, key, opts, objPtr) +} + +func (c *CacheProxy) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + if shouldDelegateList(opts) { + return c.storage.GetList(ctx, key, opts, listObj) + } + + listRV, err := c.cacher.versioner.ParseResourceVersion(opts.ResourceVersion) + if err != nil { + return err + } + + if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if !c.cacher.Ready() && shouldDelegateListOnNotReadyCache(opts) { + // If Cacher is not initialized, delegate List requests to storage + // as described in https://kep.k8s.io/4568 + return c.storage.GetList(ctx, key, opts, listObj) + } + } else { + if listRV == 0 && !c.cacher.Ready() { + // If Cacher is not yet initialized and we don't require any specific + // minimal resource version, simply forward the request to storage. + return c.storage.GetList(ctx, key, opts, listObj) + } + } + requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) + consistentRead := opts.ResourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported + if consistentRead { + listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.cacher.newListFunc, c.cacher.resourcePrefix, c.cacher.objectType.String()) + if err != nil { + return err + } + } + err = c.cacher.GetList(ctx, key, opts, listObj, listRV) + success := "true" + fallback := "false" + if err != nil { + if consistentRead { + if storage.IsTooLargeResourceVersion(err) { + fallback = "true" + err = c.storage.GetList(ctx, key, opts, listObj) + } + if err != nil { + success = "false" + } + metrics.ConsistentReadTotal.WithLabelValues(c.cacher.resourcePrefix, success, fallback).Add(1) + } + return err + } + if consistentRead { + metrics.ConsistentReadTotal.WithLabelValues(c.cacher.resourcePrefix, success, fallback).Add(1) + } + return nil +} + +func (c *CacheProxy) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error { + // Ignore the suggestion and try to pass down the current version of the object + // read from cache. + if elem, exists, err := c.cacher.watchCache.GetByKey(key); err != nil { + klog.Errorf("GetByKey returned error: %v", err) + } else if exists { + // DeepCopy the object since we modify resource version when serializing the + // current object. + currObj := elem.(*storeElement).Object.DeepCopyObject() + return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, currObj) + } + // If we couldn't get the object, fallback to no-suggestion. + return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil) +} + +func (c *CacheProxy) Count(pathPrefix string) (int64, error) { + return c.storage.Count(pathPrefix) +} + +func (c *CacheProxy) ReadinessCheck() error { + if !c.cacher.Ready() { + return storage.ErrStorageNotReady + } + return nil +} + +func (c *CacheProxy) RequestWatchProgress(ctx context.Context) error { + return c.storage.RequestWatchProgress(ctx) +}