Merge pull request #114376 from baomingwang/serialization-error-metric

Added serialization from etcd error metric
This commit is contained in:
Kubernetes Prow Robot 2023-02-07 13:46:59 -08:00 committed by GitHub
commit dfb976e25a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 3 deletions

View File

@ -113,6 +113,15 @@ var (
},
[]string{"resource"},
)
decodeErrorCounts = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: "apiserver",
Name: "storage_decode_errors_total",
Help: "Number of stored object decode errors split by object type",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
)
var registerMetrics sync.Once
@ -130,6 +139,7 @@ func Register() {
legacyregistry.MustRegister(listStorageNumFetched)
legacyregistry.MustRegister(listStorageNumSelectorEvals)
legacyregistry.MustRegister(listStorageNumReturned)
legacyregistry.MustRegister(decodeErrorCounts)
})
}
@ -148,6 +158,11 @@ func RecordEtcdBookmark(resource string) {
etcdBookmarkCounts.WithLabelValues(resource).Inc()
}
// RecordDecodeError sets the storage_decode_errors metrics.
func RecordDecodeError(resource string) {
decodeErrorCounts.WithLabelValues(resource).Inc()
}
// Reset resets the etcd_request_duration_seconds metric.
func Reset() {
etcdRequestLatency.Reset()

View File

@ -0,0 +1,57 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"strings"
"testing"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/testutil"
)
func TestRecordDecodeError(t *testing.T) {
registry := metrics.NewKubeRegistry()
defer registry.Reset()
registry.Register(decodeErrorCounts)
resourceName := "pods"
testedMetrics := "apiserver_storage_decode_errors_total"
testCases := []struct {
desc string
resource string
want string
}{
{
desc: "test success",
resource: resourceName,
want: `
# HELP apiserver_storage_decode_errors_total [ALPHA] Number of stored object decode errors split by object type
# TYPE apiserver_storage_decode_errors_total counter
apiserver_storage_decode_errors_total{resource="pods"} 1
`,
},
}
for _, test := range testCases {
t.Run(test.desc, func(t *testing.T) {
RecordDecodeError(test.resource)
if err := testutil.GatherAndCompare(registry, strings.NewReader(test.want), testedMetrics); err != nil {
t.Fatal(err)
}
})
}
}

View File

@ -156,7 +156,12 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
return storage.NewInternalError(err.Error())
}
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
err = decode(s.codec, s.versioner, data, out, kv.ModRevision)
if err != nil {
recordDecodeError(s.groupResourceString, preparedKey)
return err
}
return nil
}
// Create implements storage.Interface.Create.
@ -220,6 +225,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
err = decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
recordDecodeError(s.groupResourceString, preparedKey)
return err
}
span.AddEvent("decode succeeded", attribute.Int("len", len(data)))
@ -352,7 +358,12 @@ func (s *store) conditionalDelete(
if deleteResp.Header == nil {
return errors.New("invalid DeleteRange response - nil header")
}
return decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
err = decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
if err != nil {
recordDecodeError(s.groupResourceString, key)
return err
}
return nil
}
}
@ -470,7 +481,12 @@ func (s *store) GuaranteedUpdate(
}
// recheck that the data from etcd is not stale before short-circuiting a write
if !origState.stale {
return decode(s.codec, s.versioner, origState.data, destination, origState.rev)
err = decode(s.codec, s.versioner, origState.data, destination, origState.rev)
if err != nil {
recordDecodeError(s.groupResourceString, preparedKey)
return err
}
return nil
}
}
@ -518,6 +534,7 @@ func (s *store) GuaranteedUpdate(
err = decode(s.codec, s.versioner, data, destination, putResp.Header.Revision)
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
recordDecodeError(s.groupResourceString, preparedKey)
return err
}
span.AddEvent("decode succeeded", attribute.Int("len", len(data)))
@ -745,6 +762,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
}
if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil {
recordDecodeError(s.groupResourceString, string(kv.Key))
return err
}
numEvald++
@ -884,6 +902,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
state.data = data
state.stale = stale
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
recordDecodeError(s.groupResourceString, key)
return nil, err
}
}
@ -1022,6 +1041,12 @@ func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.Selec
return nil
}
// recordDecodeError record decode error split by object type.
func recordDecodeError(resource string, key string) {
metrics.RecordDecodeError(resource)
klog.V(4).Infof("Decoding %s \"%s\" failed", resource, key)
}
func notFound(key string) clientv3.Cmp {
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}