diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go index fe4153d57ed..d36dd263ff7 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go @@ -71,15 +71,17 @@ func StorageWithCacher() generic.StorageDecorator { if err != nil { return nil, func() {}, err } + delegator := cacherstorage.NewCacheDelegator(cacher, s) var once sync.Once destroyFunc := func() { once.Do(func() { + delegator.Stop() cacher.Stop() d() }) } - return cacherstorage.NewCacheDelegator(cacher, s), destroyFunc, nil + return delegator, destroyFunc, nil } } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index 75796170160..24ac2a6c7a4 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -2459,8 +2459,10 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE } } d := destroyFunc - s = cacherstorage.NewCacheDelegator(cacher, s) + delegator := cacherstorage.NewCacheDelegator(cacher, s) + s = delegator destroyFunc = func() { + delegator.Stop() cacher.Stop() d() } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index b56a59cf553..ec05c9962e7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -729,7 +729,7 @@ type listResp struct { } // GetList implements storage.Interface -func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, listRV uint64) error { +func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { // For recursive lists, we need to make sure the key ended with "/" so that we only // get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys // with prefix "/a" will return all three, while with prefix "/a/" will return only @@ -738,6 +738,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio if opts.Recursive && !strings.HasSuffix(key, "/") { preparedKey += "/" } + listRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion) + if err != nil { + return err + } ctx, span := tracing.Start(ctx, "cacher.GetList", attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)), diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index 5b368202eff..9a6538446f5 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -484,10 +484,6 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context t.Fatalf("Failed to initialize cacher: %v", err) } ctx := context.Background() - terminate := func() { - cacher.Stop() - server.Terminate(t) - } // Since some tests depend on the fact that GetList shouldn't fail, // we wait until the error from the underlying storage is consumed. @@ -503,8 +499,14 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context t.Fatal(err) } } + delegator := NewCacheDelegator(cacher, wrappedStorage) + terminate := func() { + delegator.Stop() + cacher.Stop() + server.Terminate(t) + } - return ctx, NewCacheDelegator(cacher, wrappedStorage), server, terminate + return ctx, delegator, server, terminate } func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 4daaf2814d8..1d1007145d4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -207,6 +207,15 @@ func (d *dummyStorage) GetCurrentResourceVersion(ctx context.Context) (uint64, e return 100, nil } +type dummyCacher struct { + dummyStorage + ready bool +} + +func (d *dummyCacher) Ready() bool { + return d.ready +} + func TestGetListCacheBypass(t *testing.T) { type opts struct { ResourceVersion string @@ -326,6 +335,7 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp } defer cacher.Stop() delegator := NewCacheDelegator(cacher, backingStorage) + defer delegator.Stop() result := &example.PodList{} if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { if err := cacher.ready.wait(context.Background()); err != nil { @@ -450,6 +460,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su } defer cacher.Stop() delegator := NewCacheDelegator(cacher, backingStorage) + defer delegator.Stop() if err := cacher.ready.wait(context.Background()); err != nil { t.Fatalf("unexpected error waiting for the cache to be ready") } @@ -533,6 +544,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) { } defer cacher.Stop() delegator := NewCacheDelegator(cacher, backingStorage) + defer delegator.Stop() pred := storage.SelectionPredicate{ Limit: 500, @@ -572,6 +584,7 @@ func TestGetCacheBypass(t *testing.T) { } defer cacher.Stop() delegator := NewCacheDelegator(cacher, backingStorage) + defer delegator.Stop() result := &example.Pod{} @@ -608,6 +621,7 @@ func TestWatchCacheBypass(t *testing.T) { } defer cacher.Stop() delegator := NewCacheDelegator(cacher, backingStorage) + defer delegator.Stop() if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { if err := cacher.ready.wait(context.Background()); err != nil { @@ -645,6 +659,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) { } defer cacher.Stop() delegator := NewCacheDelegator(cacher, backingStorage) + defer delegator.Stop() opts := storage.ListOptions{ ResourceVersion: "0", @@ -890,6 +905,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) { t.Fatalf("Couldn't create cacher: %v", err) } delegator := NewCacheDelegator(cacher, backingStorage) + defer delegator.Stop() if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { if err := cacher.ready.wait(context.Background()); err != nil { @@ -2326,6 +2342,7 @@ func BenchmarkCacher_GetList(b *testing.B) { } defer cacher.Stop() delegator := NewCacheDelegator(cacher, store) + defer delegator.Stop() // prepare result and pred parsedField, err := fields.ParseSelector("spec.nodeName=node-0") @@ -3207,6 +3224,7 @@ func TestRetryAfterForUnreadyCache(t *testing.T) { } result := &example.PodList{} delegator := NewCacheDelegator(cacher, backingStorage) + defer delegator.Stop() err = delegator.GetList(context.TODO(), "/pods/ns", opts, result) if !apierrors.IsTooManyRequests(err) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go index b1a5044598d..84d96b8d3b7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go @@ -18,12 +18,20 @@ package cacher import ( "context" + "fmt" + "hash" + "hash/fnv" + "os" + "strconv" + "sync" "time" "go.opentelemetry.io/otel/attribute" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/features" @@ -35,16 +43,45 @@ import ( "k8s.io/klog/v2" ) +var ( + // ConsistencyCheckPeriod is the period of checking consistency between etcd and cache. + // 5 minutes were proposed to match the default compaction period. It's magnitute higher than + // List latency SLO (30 seconds) and timeout (1 minute). + ConsistencyCheckPeriod = 5 * time.Minute + // ConsistencyCheckerEnabled enables the consistency checking mechanism for cache. + // Based on KUBE_WATCHCACHE_CONSISTANCY_CHECKER environment variable. + ConsistencyCheckerEnabled = false +) + +func init() { + ConsistencyCheckerEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHCACHE_CONSISTANCY_CHECKER")) +} + func NewCacheDelegator(cacher *Cacher, storage storage.Interface) *CacheDelegator { - return &CacheDelegator{ + d := &CacheDelegator{ cacher: cacher, storage: storage, + stopCh: make(chan struct{}), } + if ConsistencyCheckerEnabled { + d.checker = newConsistencyChecker(cacher.resourcePrefix, cacher.newListFunc, cacher, storage) + d.wg.Add(1) + go func() { + defer d.wg.Done() + d.checker.startChecking(d.stopCh) + }() + } + return d } type CacheDelegator struct { cacher *Cacher storage storage.Interface + checker *consistencyChecker + + wg sync.WaitGroup + stopOnce sync.Once + stopCh chan struct{} } var _ storage.Interface = (*CacheDelegator)(nil) @@ -168,14 +205,18 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L if err != nil { return err } + // Setting resource version for consistent read in cache based on current ResourceVersion in etcd. + opts.ResourceVersion = strconv.FormatInt(int64(listRV), 10) } - err = c.cacher.GetList(ctx, key, opts, listObj, listRV) + err = c.cacher.GetList(ctx, key, opts, listObj) success := "true" fallback := "false" if err != nil { if consistentRead { if storage.IsTooLargeResourceVersion(err) { fallback = "true" + // Reset resourceVersion during fallback from consistent read. + opts.ResourceVersion = "" err = c.storage.GetList(ctx, key, opts, listObj) } if err != nil { @@ -258,3 +299,156 @@ func (c *CacheDelegator) ReadinessCheck() error { func (c *CacheDelegator) RequestWatchProgress(ctx context.Context) error { return c.storage.RequestWatchProgress(ctx) } + +func (c *CacheDelegator) Stop() { + c.stopOnce.Do(func() { + close(c.stopCh) + }) + c.wg.Wait() +} + +func newConsistencyChecker(resourcePrefix string, newListFunc func() runtime.Object, cacher getListerReady, etcd getLister) *consistencyChecker { + return &consistencyChecker{ + resourcePrefix: resourcePrefix, + newListFunc: newListFunc, + cacher: cacher, + etcd: etcd, + } +} + +type consistencyChecker struct { + resourcePrefix string + newListFunc func() runtime.Object + + cacher getListerReady + etcd getLister +} + +type getListerReady interface { + getLister + Ready() bool +} + +type getLister interface { + GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error +} + +func (c consistencyChecker) startChecking(stopCh <-chan struct{}) { + err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), ConsistencyCheckPeriod, false, func(ctx context.Context) (done bool, err error) { + c.check(ctx) + return false, nil + }) + if err != nil { + klog.InfoS("Cache consistency check exiting", "resource", c.resourcePrefix, "err", err) + } +} + +func (c *consistencyChecker) check(ctx context.Context) { + digests, err := c.calculateDigests(ctx) + if err != nil { + klog.ErrorS(err, "Cache consistentency check error", "resource", c.resourcePrefix) + metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "error").Inc() + return + } + if digests.CacheDigest == digests.EtcdDigest { + klog.V(3).InfoS("Cache consistentency check passed", "resource", c.resourcePrefix, "resourceVersion", digests.ResourceVersion, "digest", digests.CacheDigest) + metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "success").Inc() + } else { + klog.ErrorS(nil, "Cache consistentency check failed", "resource", c.resourcePrefix, "resourceVersion", digests.ResourceVersion, "etcdDigest", digests.EtcdDigest, "cacheDigest", digests.CacheDigest) + metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "failure").Inc() + } +} + +func (c *consistencyChecker) calculateDigests(ctx context.Context) (*storageDigest, error) { + if !c.cacher.Ready() { + return nil, fmt.Errorf("cache is not ready") + } + cacheDigest, resourceVersion, err := c.calculateStoreDigest(ctx, c.cacher, storage.ListOptions{ + ResourceVersion: "0", + Predicate: storage.Everything, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + }) + if err != nil { + return nil, fmt.Errorf("failed calculating cache digest: %w", err) + } + etcdDigest, _, err := c.calculateStoreDigest(ctx, c.etcd, storage.ListOptions{ + ResourceVersion: resourceVersion, + Predicate: storage.Everything, + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + }) + if err != nil { + return nil, fmt.Errorf("failed calculating etcd digest: %w", err) + } + return &storageDigest{ + ResourceVersion: resourceVersion, + CacheDigest: cacheDigest, + EtcdDigest: etcdDigest, + }, nil +} + +type storageDigest struct { + ResourceVersion string + CacheDigest string + EtcdDigest string +} + +func (c *consistencyChecker) calculateStoreDigest(ctx context.Context, store getLister, opts storage.ListOptions) (digest, rv string, err error) { + // TODO: Implement pagination + resp := c.newListFunc() + err = store.GetList(ctx, c.resourcePrefix, opts, resp) + if err != nil { + return "", "", err + } + digest, err = listDigest(resp) + if err != nil { + return "", "", err + } + list, err := meta.ListAccessor(resp) + if err != nil { + return "", "", err + } + return digest, list.GetResourceVersion(), nil +} + +func listDigest(list runtime.Object) (string, error) { + h := fnv.New64() + err := meta.EachListItem(list, func(obj runtime.Object) error { + objectMeta, err := meta.Accessor(obj) + if err != nil { + return err + } + err = addObjectToDigest(h, objectMeta) + if err != nil { + return err + } + return nil + }) + if err != nil { + return "", err + } + return fmt.Sprintf("%x", h.Sum64()), nil +} + +func addObjectToDigest(h hash.Hash64, objectMeta metav1.Object) error { + _, err := h.Write([]byte(objectMeta.GetNamespace())) + if err != nil { + return err + } + _, err = h.Write([]byte("/")) + if err != nil { + return err + } + _, err = h.Write([]byte(objectMeta.GetName())) + if err != nil { + return err + } + _, err = h.Write([]byte("/")) + if err != nil { + return err + } + _, err = h.Write([]byte(objectMeta.GetResourceVersion())) + if err != nil { + return err + } + return nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator_test.go new file mode 100644 index 00000000000..f812e169f54 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator_test.go @@ -0,0 +1,194 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "context" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/apis/example" + "k8s.io/apiserver/pkg/storage" +) + +func TestCalculateDigest(t *testing.T) { + newListFunc := func() runtime.Object { return &example.PodList{} } + testCases := []struct { + desc string + resourceVersion string + cacherReady bool + cacherItems []example.Pod + etcdItems []example.Pod + resourcePrefix string + + expectListKey string + expectDigest storageDigest + expectErr bool + }{ + { + desc: "not ready", + cacherReady: false, + resourceVersion: "1", + expectErr: true, + }, + { + desc: "empty", + resourceVersion: "1", + cacherReady: true, + expectDigest: storageDigest{ + ResourceVersion: "1", + CacheDigest: "cbf29ce484222325", + EtcdDigest: "cbf29ce484222325", + }, + }, + { + desc: "with one element equal", + resourceVersion: "2", + cacherReady: true, + cacherItems: []example.Pod{ + {ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "2"}}, + }, + etcdItems: []example.Pod{ + {ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "2"}}, + }, + expectDigest: storageDigest{ + ResourceVersion: "2", + CacheDigest: "86bf3a5e80d1c5cb", + EtcdDigest: "86bf3a5e80d1c5cb", + }, + }, + { + desc: "namespace changes digest", + resourceVersion: "2", + cacherReady: true, + cacherItems: []example.Pod{ + {ObjectMeta: metav1.ObjectMeta{Namespace: "kube-system", Name: "pod", ResourceVersion: "2"}}, + }, + etcdItems: []example.Pod{ + {ObjectMeta: metav1.ObjectMeta{Namespace: "kube-public", Name: "pod", ResourceVersion: "2"}}, + }, + expectDigest: storageDigest{ + ResourceVersion: "2", + CacheDigest: "4ae4e750bd825b17", + EtcdDigest: "f940a60af965b03", + }, + }, + { + desc: "name changes digest", + resourceVersion: "2", + cacherReady: true, + cacherItems: []example.Pod{ + {ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod2", ResourceVersion: "2"}}, + }, + etcdItems: []example.Pod{ + {ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod3", ResourceVersion: "2"}}, + }, + expectDigest: storageDigest{ + ResourceVersion: "2", + CacheDigest: "c9120494e4c1897d", + EtcdDigest: "c9156494e4c46274", + }, + }, + { + desc: "resourceVersion changes digest", + resourceVersion: "2", + cacherReady: true, + cacherItems: []example.Pod{ + {ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "3"}}, + }, + etcdItems: []example.Pod{ + {ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "4"}}, + }, + expectDigest: storageDigest{ + ResourceVersion: "2", + CacheDigest: "86bf3a5e80d1c5ca", + EtcdDigest: "86bf3a5e80d1c5cd", + }, + }, + { + desc: "watch missed write event", + resourceVersion: "3", + cacherReady: true, + cacherItems: []example.Pod{ + {ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "2"}}, + }, + etcdItems: []example.Pod{ + {ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "2"}}, + {ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "3"}}, + }, + expectDigest: storageDigest{ + ResourceVersion: "3", + CacheDigest: "1859bac707c2cb2b", + EtcdDigest: "11d147fc800df0e0", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + etcd := &dummyStorage{ + getListFn: func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + if key != tc.expectListKey { + t.Fatalf("Expect GetList key %q, got %q", tc.expectListKey, key) + } + if opts.ResourceVersion != tc.resourceVersion { + t.Fatalf("Expect GetList resourceVersion %q, got %q", tc.resourceVersion, opts.ResourceVersion) + } + if opts.ResourceVersionMatch != metav1.ResourceVersionMatchExact { + t.Fatalf("Expect GetList match exact, got %q", opts.ResourceVersionMatch) + } + podList := listObj.(*example.PodList) + podList.Items = tc.etcdItems + podList.ResourceVersion = tc.resourceVersion + return nil + }, + } + cacher := &dummyCacher{ + ready: tc.cacherReady, + dummyStorage: dummyStorage{ + getListFn: func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + if key != tc.expectListKey { + t.Fatalf("Expect GetList key %q, got %q", tc.expectListKey, key) + } + if opts.ResourceVersion != "0" { + t.Fatalf("Expect GetList resourceVersion 0, got %q", opts.ResourceVersion) + } + if opts.ResourceVersionMatch != metav1.ResourceVersionMatchNotOlderThan { + t.Fatalf("Expect GetList match not older than, got %q", opts.ResourceVersionMatch) + } + podList := listObj.(*example.PodList) + podList.Items = tc.cacherItems + podList.ResourceVersion = tc.resourceVersion + return nil + }, + }, + } + checker := newConsistencyChecker(tc.resourcePrefix, newListFunc, cacher, etcd) + digest, err := checker.calculateDigests(context.Background()) + if (err != nil) != tc.expectErr { + t.Fatalf("Expect error: %v, got: %v", tc.expectErr, err) + } + if err != nil { + return + } + if *digest != tc.expectDigest { + t.Errorf("Expect: %+v Got: %+v", &tc.expectDigest, *digest) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go index dd77febb930..0559708d296 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go @@ -176,6 +176,14 @@ var ( Help: "Counter for consistent reads from cache.", StabilityLevel: compbasemetrics.ALPHA, }, []string{"resource", "success", "fallback"}) + + StorageConsistencyCheckTotal = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Namespace: namespace, + Name: "storage_consistency_checks_total", + Help: "Counter for status of consistency checks between etcd and watch cache", + StabilityLevel: compbasemetrics.INTERNAL, + }, []string{"resource", "status"}) ) var registerMetrics sync.Once @@ -198,6 +206,7 @@ func Register() { legacyregistry.MustRegister(WatchCacheInitializations) legacyregistry.MustRegister(WatchCacheReadWait) legacyregistry.MustRegister(ConsistentReadTotal) + legacyregistry.MustRegister(StorageConsistencyCheckTotal) }) } diff --git a/test/integration/metrics/metrics_test.go b/test/integration/metrics/metrics_test.go index dd1328a40d4..c65d80cf6e0 100644 --- a/test/integration/metrics/metrics_test.go +++ b/test/integration/metrics/metrics_test.go @@ -20,16 +20,22 @@ import ( "context" "errors" "fmt" + "io" + "net/http" "runtime" "slices" "strings" "testing" + "time" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/endpoints/metrics" + "k8s.io/apiserver/pkg/storage/cacher" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" compbasemetrics "k8s.io/component-base/metrics" @@ -619,3 +625,108 @@ func sampleExistsInSamples(s *model.Sample, samples model.Samples) bool { } return false } + +func TestWatchCacheConsistencyCheckMetrics(t *testing.T) { + period := time.Second + clean := overrideConsistencyCheckerTimings(period) + defer clean() + server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) + defer server.TearDownFn() + + rt, err := restclient.TransportFor(server.ClientConfig) + if err != nil { + t.Fatal(err) + } + req, err := http.NewRequest(http.MethodGet, server.ClientConfig.Host+"/metrics", nil) + if err != nil { + t.Fatal(err) + } + // Do at least 2 scrape cycles to require 2 successes + delay := 2 * period + time.Sleep(delay) + resp, err := rt.RoundTrip(req) + if err != nil { + t.Fatal(err) + } + defer func() { + err := resp.Body.Close() + if err != nil { + t.Fatal(err) + } + }() + statuses, err := parseConsistencyCheckMetric(resp.Body) + if err != nil { + t.Fatal(err) + } + resourceSuccesses := 0 + for status, count := range statuses { + switch status.status { + case "success": + if count >= 2 { + resourceSuccesses++ + } + case "failure": + t.Errorf("Failure checking consistency of resource %q", status.resource) + case "error": + t.Errorf("Error when checking consistency of resource %q", status.resource) + default: + t.Errorf("Unknown status of resource %q, status: %q", status.resource, status.status) + } + } + if resourceSuccesses <= 10 { + t.Errorf("Expected at least 10 resources with success, got: %d", resourceSuccesses) + } +} + +func overrideConsistencyCheckerTimings(period time.Duration) func() { + tmpPeriod := cacher.ConsistencyCheckPeriod + tmpEnabled := cacher.ConsistencyCheckerEnabled + cacher.ConsistencyCheckPeriod = period + cacher.ConsistencyCheckerEnabled = true + return func() { + cacher.ConsistencyCheckPeriod = tmpPeriod + cacher.ConsistencyCheckerEnabled = tmpEnabled + } +} + +func parseConsistencyCheckMetric(r io.Reader) (map[consistencyCheckStatus]float64, error) { + statuses := map[consistencyCheckStatus]float64{} + metric, err := parseMetric(r, "apiserver_storage_consistency_checks_total") + if err != nil { + return statuses, err + } + for _, m := range metric.GetMetric() { + status := consistencyCheckStatus{} + for _, label := range m.GetLabel() { + switch label.GetName() { + case "resource": + status.resource = label.GetValue() + case "status": + status.status = label.GetValue() + default: + return statuses, fmt.Errorf("Unknown label: %v", label.GetName()) + } + } + statuses[status] = m.GetCounter().GetValue() + } + return statuses, nil +} + +type consistencyCheckStatus struct { + resource string + status string +} + +func parseMetric(r io.Reader, name string) (*dto.MetricFamily, error) { + var parser expfmt.TextParser + mfs, err := parser.TextToMetricFamilies(r) + if err != nil { + return nil, err + } + for metricName, metric := range mfs { + if metricName == name { + return metric, nil + } + } + return nil, fmt.Errorf("Metric not found %q", name) +}