From 92c490f0bca03b6caea896ea10415a0d77c9efdc Mon Sep 17 00:00:00 2001 From: baomingwang Date: Thu, 8 Dec 2022 19:34:09 -0800 Subject: [PATCH] Added serialization from etcd error metric --- .../pkg/storage/etcd3/metrics/metrics.go | 15 +++++ .../pkg/storage/etcd3/metrics/metrics_test.go | 57 +++++++++++++++++++ .../apiserver/pkg/storage/etcd3/store.go | 31 +++++++++- 3 files changed, 100 insertions(+), 3 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go index 8255822945d..a01f50954fe 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go @@ -113,6 +113,15 @@ var ( }, []string{"resource"}, ) + decodeErrorCounts = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Namespace: "apiserver", + Name: "storage_decode_errors_total", + Help: "Number of stored object decode errors split by object type", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"resource"}, + ) ) var registerMetrics sync.Once @@ -130,6 +139,7 @@ func Register() { legacyregistry.MustRegister(listStorageNumFetched) legacyregistry.MustRegister(listStorageNumSelectorEvals) legacyregistry.MustRegister(listStorageNumReturned) + legacyregistry.MustRegister(decodeErrorCounts) }) } @@ -148,6 +158,11 @@ func RecordEtcdBookmark(resource string) { etcdBookmarkCounts.WithLabelValues(resource).Inc() } +// RecordDecodeError sets the storage_decode_errors metrics. +func RecordDecodeError(resource string) { + decodeErrorCounts.WithLabelValues(resource).Inc() +} + // Reset resets the etcd_request_duration_seconds metric. func Reset() { etcdRequestLatency.Reset() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics_test.go new file mode 100644 index 00000000000..76fe533bae2 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics_test.go @@ -0,0 +1,57 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "strings" + "testing" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/testutil" +) + +func TestRecordDecodeError(t *testing.T) { + registry := metrics.NewKubeRegistry() + defer registry.Reset() + registry.Register(decodeErrorCounts) + resourceName := "pods" + testedMetrics := "apiserver_storage_decode_errors_total" + testCases := []struct { + desc string + resource string + want string + }{ + { + desc: "test success", + resource: resourceName, + want: ` + # HELP apiserver_storage_decode_errors_total [ALPHA] Number of stored object decode errors split by object type + # TYPE apiserver_storage_decode_errors_total counter + apiserver_storage_decode_errors_total{resource="pods"} 1 +`, + }, + } + + for _, test := range testCases { + t.Run(test.desc, func(t *testing.T) { + RecordDecodeError(test.resource) + if err := testutil.GatherAndCompare(registry, strings.NewReader(test.want), testedMetrics); err != nil { + t.Fatal(err) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 463a84d7b91..5342be2ae91 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -156,7 +156,12 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou return storage.NewInternalError(err.Error()) } - return decode(s.codec, s.versioner, data, out, kv.ModRevision) + err = decode(s.codec, s.versioner, data, out, kv.ModRevision) + if err != nil { + recordDecodeError(s.groupResourceString, preparedKey) + return err + } + return nil } // Create implements storage.Interface.Create. @@ -220,6 +225,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, err = decode(s.codec, s.versioner, data, out, putResp.Header.Revision) if err != nil { span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error())) + recordDecodeError(s.groupResourceString, preparedKey) return err } span.AddEvent("decode succeeded", attribute.Int("len", len(data))) @@ -352,7 +358,12 @@ func (s *store) conditionalDelete( if deleteResp.Header == nil { return errors.New("invalid DeleteRange response - nil header") } - return decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision) + err = decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision) + if err != nil { + recordDecodeError(s.groupResourceString, key) + return err + } + return nil } } @@ -470,7 +481,12 @@ func (s *store) GuaranteedUpdate( } // recheck that the data from etcd is not stale before short-circuiting a write if !origState.stale { - return decode(s.codec, s.versioner, origState.data, destination, origState.rev) + err = decode(s.codec, s.versioner, origState.data, destination, origState.rev) + if err != nil { + recordDecodeError(s.groupResourceString, preparedKey) + return err + } + return nil } } @@ -518,6 +534,7 @@ func (s *store) GuaranteedUpdate( err = decode(s.codec, s.versioner, data, destination, putResp.Header.Revision) if err != nil { span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error())) + recordDecodeError(s.groupResourceString, preparedKey) return err } span.AddEvent("decode succeeded", attribute.Int("len", len(data))) @@ -745,6 +762,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption } if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil { + recordDecodeError(s.groupResourceString, string(kv.Key)) return err } numEvald++ @@ -884,6 +902,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key state.data = data state.stale = stale if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil { + recordDecodeError(s.groupResourceString, key) return nil, err } } @@ -1022,6 +1041,12 @@ func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.Selec return nil } +// recordDecodeError record decode error split by object type. +func recordDecodeError(resource string, key string) { + metrics.RecordDecodeError(resource) + klog.V(4).Infof("Decoding %s \"%s\" failed", resource, key) +} + func notFound(key string) clientv3.Cmp { return clientv3.Compare(clientv3.ModRevision(key), "=", 0) }