Merge pull request #130475 from serathius/watchcache-consistency

Implement consistency checking
This commit is contained in:
Kubernetes Prow Robot 2025-03-11 05:09:46 -07:00 committed by GitHub
commit 4c311c9fcf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 546 additions and 10 deletions

View File

@ -71,15 +71,17 @@ func StorageWithCacher() generic.StorageDecorator {
if err != nil {
return nil, func() {}, err
}
delegator := cacherstorage.NewCacheDelegator(cacher, s)
var once sync.Once
destroyFunc := func() {
once.Do(func() {
delegator.Stop()
cacher.Stop()
d()
})
}
return cacherstorage.NewCacheDelegator(cacher, s), destroyFunc, nil
return delegator, destroyFunc, nil
}
}

View File

@ -2459,8 +2459,10 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
}
}
d := destroyFunc
s = cacherstorage.NewCacheDelegator(cacher, s)
delegator := cacherstorage.NewCacheDelegator(cacher, s)
s = delegator
destroyFunc = func() {
delegator.Stop()
cacher.Stop()
d()
}

View File

@ -729,7 +729,7 @@ type listResp struct {
}
// GetList implements storage.Interface
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, listRV uint64) error {
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
// For recursive lists, we need to make sure the key ended with "/" so that we only
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
// with prefix "/a" will return all three, while with prefix "/a/" will return only
@ -738,6 +738,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
if opts.Recursive && !strings.HasSuffix(key, "/") {
preparedKey += "/"
}
listRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
ctx, span := tracing.Start(ctx, "cacher.GetList",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),

View File

@ -484,10 +484,6 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
t.Fatalf("Failed to initialize cacher: %v", err)
}
ctx := context.Background()
terminate := func() {
cacher.Stop()
server.Terminate(t)
}
// Since some tests depend on the fact that GetList shouldn't fail,
// we wait until the error from the underlying storage is consumed.
@ -503,8 +499,14 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
t.Fatal(err)
}
}
delegator := NewCacheDelegator(cacher, wrappedStorage)
terminate := func() {
delegator.Stop()
cacher.Stop()
server.Terminate(t)
}
return ctx, NewCacheDelegator(cacher, wrappedStorage), server, terminate
return ctx, delegator, server, terminate
}
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {

View File

@ -207,6 +207,15 @@ func (d *dummyStorage) GetCurrentResourceVersion(ctx context.Context) (uint64, e
return 100, nil
}
type dummyCacher struct {
dummyStorage
ready bool
}
func (d *dummyCacher) Ready() bool {
return d.ready
}
func TestGetListCacheBypass(t *testing.T) {
type opts struct {
ResourceVersion string
@ -326,6 +335,7 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
result := &example.PodList{}
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
@ -450,6 +460,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
@ -533,6 +544,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
pred := storage.SelectionPredicate{
Limit: 500,
@ -572,6 +584,7 @@ func TestGetCacheBypass(t *testing.T) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
result := &example.Pod{}
@ -608,6 +621,7 @@ func TestWatchCacheBypass(t *testing.T) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
@ -645,6 +659,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
opts := storage.ListOptions{
ResourceVersion: "0",
@ -890,6 +905,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
t.Fatalf("Couldn't create cacher: %v", err)
}
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
@ -2326,6 +2342,7 @@ func BenchmarkCacher_GetList(b *testing.B) {
}
defer cacher.Stop()
delegator := NewCacheDelegator(cacher, store)
defer delegator.Stop()
// prepare result and pred
parsedField, err := fields.ParseSelector("spec.nodeName=node-0")
@ -3207,6 +3224,7 @@ func TestRetryAfterForUnreadyCache(t *testing.T) {
}
result := &example.PodList{}
delegator := NewCacheDelegator(cacher, backingStorage)
defer delegator.Stop()
err = delegator.GetList(context.TODO(), "/pods/ns", opts, result)
if !apierrors.IsTooManyRequests(err) {

View File

@ -18,12 +18,20 @@ package cacher
import (
"context"
"fmt"
"hash"
"hash/fnv"
"os"
"strconv"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/features"
@ -35,16 +43,45 @@ import (
"k8s.io/klog/v2"
)
var (
// ConsistencyCheckPeriod is the period of checking consistency between etcd and cache.
// 5 minutes were proposed to match the default compaction period. It's magnitute higher than
// List latency SLO (30 seconds) and timeout (1 minute).
ConsistencyCheckPeriod = 5 * time.Minute
// ConsistencyCheckerEnabled enables the consistency checking mechanism for cache.
// Based on KUBE_WATCHCACHE_CONSISTANCY_CHECKER environment variable.
ConsistencyCheckerEnabled = false
)
func init() {
ConsistencyCheckerEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHCACHE_CONSISTANCY_CHECKER"))
}
func NewCacheDelegator(cacher *Cacher, storage storage.Interface) *CacheDelegator {
return &CacheDelegator{
d := &CacheDelegator{
cacher: cacher,
storage: storage,
stopCh: make(chan struct{}),
}
if ConsistencyCheckerEnabled {
d.checker = newConsistencyChecker(cacher.resourcePrefix, cacher.newListFunc, cacher, storage)
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.checker.startChecking(d.stopCh)
}()
}
return d
}
type CacheDelegator struct {
cacher *Cacher
storage storage.Interface
checker *consistencyChecker
wg sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
}
var _ storage.Interface = (*CacheDelegator)(nil)
@ -168,14 +205,18 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
if err != nil {
return err
}
// Setting resource version for consistent read in cache based on current ResourceVersion in etcd.
opts.ResourceVersion = strconv.FormatInt(int64(listRV), 10)
}
err = c.cacher.GetList(ctx, key, opts, listObj, listRV)
err = c.cacher.GetList(ctx, key, opts, listObj)
success := "true"
fallback := "false"
if err != nil {
if consistentRead {
if storage.IsTooLargeResourceVersion(err) {
fallback = "true"
// Reset resourceVersion during fallback from consistent read.
opts.ResourceVersion = ""
err = c.storage.GetList(ctx, key, opts, listObj)
}
if err != nil {
@ -258,3 +299,156 @@ func (c *CacheDelegator) ReadinessCheck() error {
func (c *CacheDelegator) RequestWatchProgress(ctx context.Context) error {
return c.storage.RequestWatchProgress(ctx)
}
func (c *CacheDelegator) Stop() {
c.stopOnce.Do(func() {
close(c.stopCh)
})
c.wg.Wait()
}
func newConsistencyChecker(resourcePrefix string, newListFunc func() runtime.Object, cacher getListerReady, etcd getLister) *consistencyChecker {
return &consistencyChecker{
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
cacher: cacher,
etcd: etcd,
}
}
type consistencyChecker struct {
resourcePrefix string
newListFunc func() runtime.Object
cacher getListerReady
etcd getLister
}
type getListerReady interface {
getLister
Ready() bool
}
type getLister interface {
GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error
}
func (c consistencyChecker) startChecking(stopCh <-chan struct{}) {
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), ConsistencyCheckPeriod, false, func(ctx context.Context) (done bool, err error) {
c.check(ctx)
return false, nil
})
if err != nil {
klog.InfoS("Cache consistency check exiting", "resource", c.resourcePrefix, "err", err)
}
}
func (c *consistencyChecker) check(ctx context.Context) {
digests, err := c.calculateDigests(ctx)
if err != nil {
klog.ErrorS(err, "Cache consistentency check error", "resource", c.resourcePrefix)
metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "error").Inc()
return
}
if digests.CacheDigest == digests.EtcdDigest {
klog.V(3).InfoS("Cache consistentency check passed", "resource", c.resourcePrefix, "resourceVersion", digests.ResourceVersion, "digest", digests.CacheDigest)
metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "success").Inc()
} else {
klog.ErrorS(nil, "Cache consistentency check failed", "resource", c.resourcePrefix, "resourceVersion", digests.ResourceVersion, "etcdDigest", digests.EtcdDigest, "cacheDigest", digests.CacheDigest)
metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "failure").Inc()
}
}
func (c *consistencyChecker) calculateDigests(ctx context.Context) (*storageDigest, error) {
if !c.cacher.Ready() {
return nil, fmt.Errorf("cache is not ready")
}
cacheDigest, resourceVersion, err := c.calculateStoreDigest(ctx, c.cacher, storage.ListOptions{
ResourceVersion: "0",
Predicate: storage.Everything,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
})
if err != nil {
return nil, fmt.Errorf("failed calculating cache digest: %w", err)
}
etcdDigest, _, err := c.calculateStoreDigest(ctx, c.etcd, storage.ListOptions{
ResourceVersion: resourceVersion,
Predicate: storage.Everything,
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
})
if err != nil {
return nil, fmt.Errorf("failed calculating etcd digest: %w", err)
}
return &storageDigest{
ResourceVersion: resourceVersion,
CacheDigest: cacheDigest,
EtcdDigest: etcdDigest,
}, nil
}
type storageDigest struct {
ResourceVersion string
CacheDigest string
EtcdDigest string
}
func (c *consistencyChecker) calculateStoreDigest(ctx context.Context, store getLister, opts storage.ListOptions) (digest, rv string, err error) {
// TODO: Implement pagination
resp := c.newListFunc()
err = store.GetList(ctx, c.resourcePrefix, opts, resp)
if err != nil {
return "", "", err
}
digest, err = listDigest(resp)
if err != nil {
return "", "", err
}
list, err := meta.ListAccessor(resp)
if err != nil {
return "", "", err
}
return digest, list.GetResourceVersion(), nil
}
func listDigest(list runtime.Object) (string, error) {
h := fnv.New64()
err := meta.EachListItem(list, func(obj runtime.Object) error {
objectMeta, err := meta.Accessor(obj)
if err != nil {
return err
}
err = addObjectToDigest(h, objectMeta)
if err != nil {
return err
}
return nil
})
if err != nil {
return "", err
}
return fmt.Sprintf("%x", h.Sum64()), nil
}
func addObjectToDigest(h hash.Hash64, objectMeta metav1.Object) error {
_, err := h.Write([]byte(objectMeta.GetNamespace()))
if err != nil {
return err
}
_, err = h.Write([]byte("/"))
if err != nil {
return err
}
_, err = h.Write([]byte(objectMeta.GetName()))
if err != nil {
return err
}
_, err = h.Write([]byte("/"))
if err != nil {
return err
}
_, err = h.Write([]byte(objectMeta.GetResourceVersion()))
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,194 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"context"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/apis/example"
"k8s.io/apiserver/pkg/storage"
)
func TestCalculateDigest(t *testing.T) {
newListFunc := func() runtime.Object { return &example.PodList{} }
testCases := []struct {
desc string
resourceVersion string
cacherReady bool
cacherItems []example.Pod
etcdItems []example.Pod
resourcePrefix string
expectListKey string
expectDigest storageDigest
expectErr bool
}{
{
desc: "not ready",
cacherReady: false,
resourceVersion: "1",
expectErr: true,
},
{
desc: "empty",
resourceVersion: "1",
cacherReady: true,
expectDigest: storageDigest{
ResourceVersion: "1",
CacheDigest: "cbf29ce484222325",
EtcdDigest: "cbf29ce484222325",
},
},
{
desc: "with one element equal",
resourceVersion: "2",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "2"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "2"}},
},
expectDigest: storageDigest{
ResourceVersion: "2",
CacheDigest: "86bf3a5e80d1c5cb",
EtcdDigest: "86bf3a5e80d1c5cb",
},
},
{
desc: "namespace changes digest",
resourceVersion: "2",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "kube-system", Name: "pod", ResourceVersion: "2"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "kube-public", Name: "pod", ResourceVersion: "2"}},
},
expectDigest: storageDigest{
ResourceVersion: "2",
CacheDigest: "4ae4e750bd825b17",
EtcdDigest: "f940a60af965b03",
},
},
{
desc: "name changes digest",
resourceVersion: "2",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod2", ResourceVersion: "2"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod3", ResourceVersion: "2"}},
},
expectDigest: storageDigest{
ResourceVersion: "2",
CacheDigest: "c9120494e4c1897d",
EtcdDigest: "c9156494e4c46274",
},
},
{
desc: "resourceVersion changes digest",
resourceVersion: "2",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "3"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "4"}},
},
expectDigest: storageDigest{
ResourceVersion: "2",
CacheDigest: "86bf3a5e80d1c5ca",
EtcdDigest: "86bf3a5e80d1c5cd",
},
},
{
desc: "watch missed write event",
resourceVersion: "3",
cacherReady: true,
cacherItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "2"}},
},
etcdItems: []example.Pod{
{ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "2"}},
{ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "3"}},
},
expectDigest: storageDigest{
ResourceVersion: "3",
CacheDigest: "1859bac707c2cb2b",
EtcdDigest: "11d147fc800df0e0",
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
etcd := &dummyStorage{
getListFn: func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
if key != tc.expectListKey {
t.Fatalf("Expect GetList key %q, got %q", tc.expectListKey, key)
}
if opts.ResourceVersion != tc.resourceVersion {
t.Fatalf("Expect GetList resourceVersion %q, got %q", tc.resourceVersion, opts.ResourceVersion)
}
if opts.ResourceVersionMatch != metav1.ResourceVersionMatchExact {
t.Fatalf("Expect GetList match exact, got %q", opts.ResourceVersionMatch)
}
podList := listObj.(*example.PodList)
podList.Items = tc.etcdItems
podList.ResourceVersion = tc.resourceVersion
return nil
},
}
cacher := &dummyCacher{
ready: tc.cacherReady,
dummyStorage: dummyStorage{
getListFn: func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
if key != tc.expectListKey {
t.Fatalf("Expect GetList key %q, got %q", tc.expectListKey, key)
}
if opts.ResourceVersion != "0" {
t.Fatalf("Expect GetList resourceVersion 0, got %q", opts.ResourceVersion)
}
if opts.ResourceVersionMatch != metav1.ResourceVersionMatchNotOlderThan {
t.Fatalf("Expect GetList match not older than, got %q", opts.ResourceVersionMatch)
}
podList := listObj.(*example.PodList)
podList.Items = tc.cacherItems
podList.ResourceVersion = tc.resourceVersion
return nil
},
},
}
checker := newConsistencyChecker(tc.resourcePrefix, newListFunc, cacher, etcd)
digest, err := checker.calculateDigests(context.Background())
if (err != nil) != tc.expectErr {
t.Fatalf("Expect error: %v, got: %v", tc.expectErr, err)
}
if err != nil {
return
}
if *digest != tc.expectDigest {
t.Errorf("Expect: %+v Got: %+v", &tc.expectDigest, *digest)
}
})
}
}

View File

@ -176,6 +176,14 @@ var (
Help: "Counter for consistent reads from cache.",
StabilityLevel: compbasemetrics.ALPHA,
}, []string{"resource", "success", "fallback"})
StorageConsistencyCheckTotal = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Name: "storage_consistency_checks_total",
Help: "Counter for status of consistency checks between etcd and watch cache",
StabilityLevel: compbasemetrics.INTERNAL,
}, []string{"resource", "status"})
)
var registerMetrics sync.Once
@ -198,6 +206,7 @@ func Register() {
legacyregistry.MustRegister(WatchCacheInitializations)
legacyregistry.MustRegister(WatchCacheReadWait)
legacyregistry.MustRegister(ConsistentReadTotal)
legacyregistry.MustRegister(StorageConsistencyCheckTotal)
})
}

View File

@ -20,16 +20,22 @@ import (
"context"
"errors"
"fmt"
"io"
"net/http"
"runtime"
"slices"
"strings"
"testing"
"time"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/storage/cacher"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
compbasemetrics "k8s.io/component-base/metrics"
@ -619,3 +625,108 @@ func sampleExistsInSamples(s *model.Sample, samples model.Samples) bool {
}
return false
}
func TestWatchCacheConsistencyCheckMetrics(t *testing.T) {
period := time.Second
clean := overrideConsistencyCheckerTimings(period)
defer clean()
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer server.TearDownFn()
rt, err := restclient.TransportFor(server.ClientConfig)
if err != nil {
t.Fatal(err)
}
req, err := http.NewRequest(http.MethodGet, server.ClientConfig.Host+"/metrics", nil)
if err != nil {
t.Fatal(err)
}
// Do at least 2 scrape cycles to require 2 successes
delay := 2 * period
time.Sleep(delay)
resp, err := rt.RoundTrip(req)
if err != nil {
t.Fatal(err)
}
defer func() {
err := resp.Body.Close()
if err != nil {
t.Fatal(err)
}
}()
statuses, err := parseConsistencyCheckMetric(resp.Body)
if err != nil {
t.Fatal(err)
}
resourceSuccesses := 0
for status, count := range statuses {
switch status.status {
case "success":
if count >= 2 {
resourceSuccesses++
}
case "failure":
t.Errorf("Failure checking consistency of resource %q", status.resource)
case "error":
t.Errorf("Error when checking consistency of resource %q", status.resource)
default:
t.Errorf("Unknown status of resource %q, status: %q", status.resource, status.status)
}
}
if resourceSuccesses <= 10 {
t.Errorf("Expected at least 10 resources with success, got: %d", resourceSuccesses)
}
}
func overrideConsistencyCheckerTimings(period time.Duration) func() {
tmpPeriod := cacher.ConsistencyCheckPeriod
tmpEnabled := cacher.ConsistencyCheckerEnabled
cacher.ConsistencyCheckPeriod = period
cacher.ConsistencyCheckerEnabled = true
return func() {
cacher.ConsistencyCheckPeriod = tmpPeriod
cacher.ConsistencyCheckerEnabled = tmpEnabled
}
}
func parseConsistencyCheckMetric(r io.Reader) (map[consistencyCheckStatus]float64, error) {
statuses := map[consistencyCheckStatus]float64{}
metric, err := parseMetric(r, "apiserver_storage_consistency_checks_total")
if err != nil {
return statuses, err
}
for _, m := range metric.GetMetric() {
status := consistencyCheckStatus{}
for _, label := range m.GetLabel() {
switch label.GetName() {
case "resource":
status.resource = label.GetValue()
case "status":
status.status = label.GetValue()
default:
return statuses, fmt.Errorf("Unknown label: %v", label.GetName())
}
}
statuses[status] = m.GetCounter().GetValue()
}
return statuses, nil
}
type consistencyCheckStatus struct {
resource string
status string
}
func parseMetric(r io.Reader, name string) (*dto.MetricFamily, error) {
var parser expfmt.TextParser
mfs, err := parser.TextToMetricFamilies(r)
if err != nil {
return nil, err
}
for metricName, metric := range mfs {
if metricName == name {
return metric, nil
}
}
return nil, fmt.Errorf("Metric not found %q", name)
}