Merge pull request #112042 from ncdc/fix-etcd-unstructured-metrics

etcd3: include GroupResource in logs/metrics
This commit is contained in:
Kubernetes Prow Robot 2022-08-25 22:26:11 -07:00 committed by GitHub
commit a14fc3d7a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 25 deletions

View File

@ -110,7 +110,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob
pathPrefix: path.Join("/", prefix),
groupResource: groupResource,
groupResourceString: groupResource.String(),
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
watcher: newWatcher(c, codec, groupResource, newFunc, versioner, transformer),
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
}
return result
@ -126,7 +126,7 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
key = path.Join(s.pathPrefix, key)
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
if err != nil {
return err
}
@ -156,6 +156,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)},
utiltrace.Field{"key", key},
utiltrace.Field{"type", getTypeName(obj)},
utiltrace.Field{"resource", s.groupResourceString},
)
defer trace.LogIfLong(500 * time.Millisecond)
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
@ -189,7 +190,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Commit()
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
metrics.RecordEtcdRequestLatency("create", s.groupResourceString, startTime)
trace.Step("Txn call finished", utiltrace.Field{"err", err})
if err != nil {
return err
@ -226,7 +227,7 @@ func (s *store) conditionalDelete(
getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
if err != nil {
return nil, err
}
@ -308,7 +309,7 @@ func (s *store) conditionalDelete(
).Else(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
metrics.RecordEtcdRequestLatency("delete", s.groupResourceString, startTime)
if err != nil {
return err
}
@ -333,7 +334,8 @@ func (s *store) GuaranteedUpdate(
trace := utiltrace.New("GuaranteedUpdate etcd3",
utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)},
utiltrace.Field{"key", key},
utiltrace.Field{"type", getTypeName(destination)})
utiltrace.Field{"type", getTypeName(destination)},
utiltrace.Field{"resource", s.groupResourceString})
defer trace.LogIfLong(500 * time.Millisecond)
v, err := conversion.EnforcePtr(destination)
@ -345,7 +347,7 @@ func (s *store) GuaranteedUpdate(
getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(destination), startTime)
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
if err != nil {
return nil, err
}
@ -459,7 +461,7 @@ func (s *store) GuaranteedUpdate(
).Else(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequestLatency("update", getTypeName(destination), startTime)
metrics.RecordEtcdRequestLatency("update", s.groupResourceString, startTime)
trace.Step("Txn call finished", utiltrace.Field{"err", err})
if err != nil {
return err
@ -659,9 +661,9 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
startTime := time.Now()
getResp, err = s.client.KV.Get(ctx, key, options...)
if recursive {
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
metrics.RecordEtcdRequestLatency("list", s.groupResourceString, startTime)
} else {
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
}
if err != nil {
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)

View File

@ -28,6 +28,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
@ -69,12 +70,13 @@ func TestOnlySetFatalOnDecodeError(b bool) {
}
type watcher struct {
client *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
objectType string
versioner storage.Versioner
transformer value.Transformer
client *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
objectType string
groupResource schema.GroupResource
versioner storage.Versioner
transformer value.Transformer
}
// watchChan implements watch.Interface.
@ -92,13 +94,14 @@ type watchChan struct {
errChan chan error
}
func newWatcher(client *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher {
func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource schema.GroupResource, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher {
res := &watcher{
client: client,
codec: codec,
newFunc: newFunc,
versioner: versioner,
transformer: transformer,
client: client,
codec: codec,
groupResource: groupResource,
newFunc: newFunc,
versioner: versioner,
transformer: transformer,
}
if newFunc == nil {
res.objectType = "<unknown>"
@ -259,7 +262,7 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
}
if wres.IsProgressNotify() {
wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision()))
metrics.RecordEtcdBookmark(wc.watcher.objectType)
metrics.RecordEtcdBookmark(wc.watcher.groupResource.String())
continue
}
@ -292,7 +295,7 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
continue
}
if len(wc.resultChan) == outgoingBufSize {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType)
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
@ -411,7 +414,7 @@ func (wc *watchChan) sendError(err error) {
func (wc *watchChan) sendEvent(e *event) {
if len(wc.incomingEventChan) == incomingBufSize {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize, "objectType", wc.watcher.objectType)
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
}
select {
case wc.incomingEventChan <- e: