diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index c553045814d..48791bd7b63 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -848,7 +848,8 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio preparedKey += "/" } requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) - if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported { + consistentRead := resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported + if consistentRead { listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String()) if err != nil { return err @@ -887,9 +888,24 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio } objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive) + success := "true" + fallback := "false" if err != nil { + if consistentRead { + if storage.IsTooLargeResourceVersion(err) { + fallback = "true" + err = c.storage.GetList(ctx, key, opts, listObj) + } + if err != nil { + success = "false" + } + metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1) + } return err } + if consistentRead { + metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1) + } span.AddEvent("Listed items from cache", attribute.Int("count", len(objs))) // store pointer of eligible objects, // Why not directly put object in the items of listObj? diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 3520da9d0e5..fa7257f216d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -24,6 +24,7 @@ import ( "reflect" goruntime "runtime" "strconv" + "strings" "sync" "testing" "time" @@ -45,10 +46,13 @@ import ( examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/cacher/metrics" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" etcdfeature "k8s.io/apiserver/pkg/storage/feature" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" + k8smetrics "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/testutil" "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/pointer" @@ -288,6 +292,138 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp } } +func TestConsistentReadFallback(t *testing.T) { + tcs := []struct { + name string + consistentReadsEnabled bool + watchCacheRV string + storageRV string + fallbackError bool + + expectError bool + expectRV string + expectBlock bool + expectRequestsToStorage int + expectMetric string + }{ + { + name: "Success", + consistentReadsEnabled: true, + watchCacheRV: "42", + storageRV: "42", + expectRV: "42", + expectRequestsToStorage: 1, + expectMetric: ` +# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache. +# TYPE apiserver_watch_cache_consistent_read_total counter +apiserver_watch_cache_consistent_read_total{fallback="false", resource="pods", success="true"} 1 +`, + }, + { + name: "Fallback", + consistentReadsEnabled: true, + watchCacheRV: "2", + storageRV: "42", + expectRV: "42", + expectBlock: true, + expectRequestsToStorage: 2, + expectMetric: ` +# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache. +# TYPE apiserver_watch_cache_consistent_read_total counter +apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", success="true"} 1 +`, + }, + { + name: "Fallback Failure", + consistentReadsEnabled: true, + watchCacheRV: "2", + storageRV: "42", + fallbackError: true, + expectError: true, + expectBlock: true, + expectRequestsToStorage: 2, + expectMetric: ` +# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache. +# TYPE apiserver_watch_cache_consistent_read_total counter +apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", success="false"} 1 +`, + }, + { + name: "Disabled", + watchCacheRV: "2", + storageRV: "42", + expectRV: "42", + expectRequestsToStorage: 1, + expectMetric: ``, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, tc.consistentReadsEnabled) + if tc.consistentReadsEnabled { + forceRequestWatchProgressSupport(t) + } + + registry := k8smetrics.NewKubeRegistry() + metrics.ConsistentReadTotal.Reset() + if err := registry.Register(metrics.ConsistentReadTotal); err != nil { + t.Errorf("unexpected error: %v", err) + } + backingStorage := &dummyStorage{} + backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + podList := listObj.(*example.PodList) + podList.ResourceVersion = tc.watchCacheRV + return nil + } + // TODO: Use fake clock for this test to reduce execution time. + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + if fmt.Sprintf("%d", cacher.watchCache.resourceVersion) != tc.watchCacheRV { + t.Fatalf("Expected watch cache RV to equal watchCacheRV, got: %d, want: %s", cacher.watchCache.resourceVersion, tc.watchCacheRV) + } + requestToStorageCount := 0 + backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + requestToStorageCount += 1 + podList := listObj.(*example.PodList) + if key == cacher.resourcePrefix { + podList.ResourceVersion = tc.storageRV + return nil + } + if tc.fallbackError { + return errDummy + } + podList.ResourceVersion = tc.storageRV + return nil + } + result := &example.PodList{} + start := cacher.clock.Now() + err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result) + duration := cacher.clock.Since(start) + if (err != nil) != tc.expectError { + t.Fatalf("Unexpected error err: %v", err) + } + if result.ResourceVersion != tc.expectRV { + t.Fatalf("Unexpected List response RV, got: %q, want: %q", result.ResourceVersion, tc.expectRV) + } + if requestToStorageCount != tc.expectRequestsToStorage { + t.Fatalf("Unexpected number of requests to storage, got: %d, want: %d", requestToStorageCount, tc.expectRequestsToStorage) + } + blocked := duration >= blockTimeout + if blocked != tc.expectBlock { + t.Fatalf("Unexpected block, got: %v, want: %v", blocked, tc.expectBlock) + } + + if err := testutil.GatherAndCompare(registry, strings.NewReader(tc.expectMetric), "apiserver_watch_cache_consistent_read_total"); err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + } +} + func TestGetListNonRecursiveCacheBypass(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false) backingStorage := &dummyStorage{} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go index 2c5dc4806ff..dd77febb930 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go @@ -167,6 +167,15 @@ var ( StabilityLevel: compbasemetrics.ALPHA, Buckets: []float64{0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3}, }, []string{"resource"}) + + ConsistentReadTotal = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "consistent_read_total", + Help: "Counter for consistent reads from cache.", + StabilityLevel: compbasemetrics.ALPHA, + }, []string{"resource", "success", "fallback"}) ) var registerMetrics sync.Once @@ -188,6 +197,7 @@ func Register() { legacyregistry.MustRegister(WatchCacheCapacity) legacyregistry.MustRegister(WatchCacheInitializations) legacyregistry.MustRegister(WatchCacheReadWait) + legacyregistry.MustRegister(ConsistentReadTotal) }) }