diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index 5f6a3381a66..c6d8befefdf 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -208,6 +208,13 @@ const ( // // Allow the API server to stream individual items instead of chunking WatchList featuregate.Feature = "WatchList" + + // owner: @serathius + // kep: http://kep.k8s.io/2340 + // alpha: v1.28 + // + // Allow the API server to serve consistent lists from cache + ConsistentListFromCache featuregate.Feature = "ConsistentListFromCache" ) func init() { @@ -264,4 +271,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha}, WatchList: {Default: false, PreRelease: featuregate.Alpha}, + + ConsistentListFromCache: {Default: false, PreRelease: featuregate.Alpha}, } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index d89d574315b..0796f591d7f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -726,9 +726,10 @@ func shouldDelegateList(opts storage.ListOptions) bool { pred := opts.Predicate match := opts.ResourceVersionMatch pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) + consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) - // Serve consistent reads from storage - consistentReadFromStorage := resourceVersion == "" + // Serve consistent reads from storage if ConsistentListFromCache is disabled + consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled // Watch cache doesn't support continuations, so serve them from etcd. hasContinuation := pagingEnabled && len(pred.Continue) > 0 // Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd. @@ -762,19 +763,21 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio return c.storage.GetList(ctx, key, opts, listObj) } - // If resourceVersion is specified, serve it from cache. - // It's guaranteed that the returned value is at least that - // fresh as the given resourceVersion. listRV, err := c.versioner.ParseResourceVersion(resourceVersion) if err != nil { return err } - if listRV == 0 && !c.ready.check() { // If Cacher is not yet initialized and we don't require any specific // minimal resource version, simply forward the request to storage. return c.storage.GetList(ctx, key, opts, listObj) } + if listRV == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) { + listRV, err = c.getCurrentResourceVersionFromStorage(ctx) + if err != nil { + return err + } + } ctx, span := tracing.Start(ctx, "cacher list", attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)), diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index 018ecfdcdb0..2a8f2b7e6fa 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -31,9 +31,12 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" storagetesting "k8s.io/apiserver/pkg/storage/testing" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" ) @@ -150,6 +153,13 @@ func TestList(t *testing.T) { storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true) } +func TestListWithListFromCache(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)() + ctx, cacher, server, terminate := testSetupWithEtcdServer(t) + t.Cleanup(terminate) + storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true) +} + func TestListWithoutPaging(t *testing.T) { ctx, cacher, terminate := testSetup(t, withoutPaging) t.Cleanup(terminate) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index ae77e549767..ebaad698d73 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -162,8 +162,7 @@ func TestGetListCacheBypass(t *testing.T) { opts storage.ListOptions expectBypass bool } - testCases := []testCase{ - {opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true}, + commonTestCases := []testCase{ {opts: storage.ListOptions{ResourceVersion: "0"}, expectBypass: false}, {opts: storage.ListOptions{ResourceVersion: "1"}, expectBypass: false}, @@ -180,9 +179,25 @@ func TestGetListCacheBypass(t *testing.T) { {opts: storage.ListOptions{ResourceVersion: "1", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, expectBypass: true}, } - for _, tc := range testCases { - testGetListCacheBypass(t, tc.opts, tc.expectBypass) - } + t.Run("ConsistentListFromStorage", func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)() + testCases := append(commonTestCases, + testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true}, + ) + for _, tc := range testCases { + testGetListCacheBypass(t, tc.opts, tc.expectBypass) + } + + }) + t.Run("ConsistentListFromCache", func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)() + testCases := append(commonTestCases, + testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: false}, + ) + for _, tc := range testCases { + testGetListCacheBypass(t, tc.opts, tc.expectBypass) + } + }) } func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectBypass bool) { @@ -200,7 +215,23 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp t.Fatalf("unexpected error waiting for the cache to be ready") } // Inject error to underlying layer and check if cacher is not bypassed. - backingStorage.injectError(errDummy) + backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + currentResourceVersion := "42" + switch { + // request made by getCurrentResourceVersionFromStorage by checking Limit + case key == cacher.resourcePrefix: + podList := listObj.(*example.PodList) + podList.ResourceVersion = currentResourceVersion + return nil + // request made by storage.GetList with revision from original request or + // returned by getCurrentResourceVersionFromStorage + case opts.ResourceVersion == options.ResourceVersion || opts.ResourceVersion == currentResourceVersion: + return errDummy + default: + t.Fatalf("Unexpected request %+v", opts) + return nil + } + } err = cacher.GetList(context.TODO(), "pods/ns", options, result) if err != nil && err != errDummy { t.Fatalf("Unexpected error for List request with options: %v, err: %v", options, err) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index bbffffc1900..c26eb55dac4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -30,8 +30,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/cacher/metrics" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" "k8s.io/component-base/tracing" "k8s.io/klog/v2" @@ -412,6 +414,7 @@ func (w *watchCache) UpdateResourceVersion(resourceVersion string) { w.Lock() defer w.Unlock() w.resourceVersion = rv + w.cond.Broadcast() }() // Avoid calling event handler under lock. @@ -490,7 +493,14 @@ func (s sortableStoreElements) Swap(i, j int) { // WaitUntilFreshAndList returns list of pointers to `storeElement` objects along // with their ResourceVersion and the name of the index, if any, that was used. func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) { - err := w.waitUntilFreshAndBlock(ctx, resourceVersion) + var err error + if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && w.notFresh(resourceVersion) { + w.waitingUntilFresh.Add() + err = w.waitUntilFreshAndBlock(ctx, resourceVersion) + w.waitingUntilFresh.Remove() + } else { + err = w.waitUntilFreshAndBlock(ctx, resourceVersion) + } defer w.RUnlock() if err != nil { return nil, 0, "", err @@ -513,6 +523,12 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion return result, rv, index, err } +func (w *watchCache) notFresh(resourceVersion uint64) bool { + w.RLock() + defer w.RUnlock() + return resourceVersion > w.resourceVersion +} + // WaitUntilFreshAndGet returns a pointers to object. func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) { err := w.waitUntilFreshAndBlock(ctx, resourceVersion) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 3a772f347f7..6809225d76d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -34,8 +34,11 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" ) @@ -516,6 +519,33 @@ func TestWaitUntilFreshAndList(t *testing.T) { } } +func TestWaitUntilFreshAndListFromCache(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)() + ctx := context.Background() + store := newTestWatchCache(3, &cache.Indexers{}) + defer store.Stop() + // In background, update the store. + go func() { + store.Add(makeTestPod("pod1", 2)) + store.bookmarkRevision <- 3 + }() + + // list from future revision. Requires watch cache to request bookmark to get it. + list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resourceVersion != 3 { + t.Errorf("unexpected resourceVersion: %v, expected: 6", resourceVersion) + } + if len(list) != 1 { + t.Errorf("unexpected list returned: %#v", list) + } + if indexUsed != "" { + t.Errorf("Used index %q but expected none to be used", indexUsed) + } +} + func TestWaitUntilFreshAndGet(t *testing.T) { ctx := context.Background() store := newTestWatchCache(3, &cache.Indexers{}) @@ -544,31 +574,51 @@ func TestWaitUntilFreshAndGet(t *testing.T) { } func TestWaitUntilFreshAndListTimeout(t *testing.T) { - ctx := context.Background() - store := newTestWatchCache(3, &cache.Indexers{}) - defer store.Stop() - fc := store.clock.(*testingclock.FakeClock) - - // In background, step clock after the below call starts the timer. - go func() { - for !fc.HasWaiters() { - time.Sleep(time.Millisecond) - } - fc.Step(blockTimeout) - - // Add an object to make sure the test would - // eventually fail instead of just waiting - // forever. - time.Sleep(30 * time.Second) - store.Add(makeTestPod("bar", 5)) - }() - - _, _, _, err := store.WaitUntilFreshAndList(ctx, 5, nil) - if !errors.IsTimeout(err) { - t.Errorf("expected timeout error but got: %v", err) + tcs := []struct { + name string + ConsistentListFromCache bool + }{ + { + name: "FromStorage", + ConsistentListFromCache: false, + }, + { + name: "FromCache", + ConsistentListFromCache: true, + }, } - if !storage.IsTooLargeResourceVersion(err) { - t.Errorf("expected 'Too large resource version' cause in error but got: %v", err) + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, tc.ConsistentListFromCache)() + ctx := context.Background() + store := newTestWatchCache(3, &cache.Indexers{}) + defer store.Stop() + fc := store.clock.(*testingclock.FakeClock) + + // In background, step clock after the below call starts the timer. + go func() { + for !fc.HasWaiters() { + time.Sleep(time.Millisecond) + } + store.Add(makeTestPod("foo", 2)) + store.bookmarkRevision <- 3 + fc.Step(blockTimeout) + + // Add an object to make sure the test would + // eventually fail instead of just waiting + // forever. + time.Sleep(30 * time.Second) + store.Add(makeTestPod("bar", 4)) + }() + + _, _, _, err := store.WaitUntilFreshAndList(ctx, 4, nil) + if !errors.IsTimeout(err) { + t.Errorf("expected timeout error but got: %v", err) + } + if !storage.IsTooLargeResourceVersion(err) { + t.Errorf("expected 'Too large resource version' cause in error but got: %v", err) + } + }) } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go index 22f556d2506..90f4c381e3b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go @@ -149,9 +149,10 @@ func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool { resourceVersion := opts.ResourceVersion match := opts.ResourceVersionMatch pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) + consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) - // Serve consistent reads from storage - consistentReadFromStorage := resourceVersion == "" + // Serve consistent reads from storage if ConsistentListFromCache is disabled + consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled // Watch cache doesn't support continuations, so serve them from etcd. hasContinuation := pagingEnabled && len(opts.Continue) > 0 // Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.