diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 163eb111bcb..3730b58b83b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -110,7 +110,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob pathPrefix: path.Join("/", prefix), groupResource: groupResource, groupResourceString: groupResource.String(), - watcher: newWatcher(c, codec, newFunc, versioner, transformer), + watcher: newWatcher(c, codec, groupResource, newFunc, versioner, transformer), leaseManager: newDefaultLeaseManager(c, leaseManagerConfig), } return result @@ -126,7 +126,7 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou key = path.Join(s.pathPrefix, key) startTime := time.Now() getResp, err := s.client.KV.Get(ctx, key) - metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) + metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime) if err != nil { return err } @@ -156,6 +156,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)}, utiltrace.Field{"key", key}, utiltrace.Field{"type", getTypeName(obj)}, + utiltrace.Field{"resource", s.groupResourceString}, ) defer trace.LogIfLong(500 * time.Millisecond) if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 { @@ -189,7 +190,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ).Then( clientv3.OpPut(key, string(newData), opts...), ).Commit() - metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) + metrics.RecordEtcdRequestLatency("create", s.groupResourceString, startTime) trace.Step("Txn call finished", utiltrace.Field{"err", err}) if err != nil { return err @@ -226,7 +227,7 @@ func (s *store) conditionalDelete( getCurrentState := func() (*objState, error) { startTime := time.Now() getResp, err := s.client.KV.Get(ctx, key) - metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) + metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime) if err != nil { return nil, err } @@ -308,7 +309,7 @@ func (s *store) conditionalDelete( ).Else( clientv3.OpGet(key), ).Commit() - metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) + metrics.RecordEtcdRequestLatency("delete", s.groupResourceString, startTime) if err != nil { return err } @@ -333,7 +334,8 @@ func (s *store) GuaranteedUpdate( trace := utiltrace.New("GuaranteedUpdate etcd3", utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)}, utiltrace.Field{"key", key}, - utiltrace.Field{"type", getTypeName(destination)}) + utiltrace.Field{"type", getTypeName(destination)}, + utiltrace.Field{"resource", s.groupResourceString}) defer trace.LogIfLong(500 * time.Millisecond) v, err := conversion.EnforcePtr(destination) @@ -345,7 +347,7 @@ func (s *store) GuaranteedUpdate( getCurrentState := func() (*objState, error) { startTime := time.Now() getResp, err := s.client.KV.Get(ctx, key) - metrics.RecordEtcdRequestLatency("get", getTypeName(destination), startTime) + metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime) if err != nil { return nil, err } @@ -459,7 +461,7 @@ func (s *store) GuaranteedUpdate( ).Else( clientv3.OpGet(key), ).Commit() - metrics.RecordEtcdRequestLatency("update", getTypeName(destination), startTime) + metrics.RecordEtcdRequestLatency("update", s.groupResourceString, startTime) trace.Step("Txn call finished", utiltrace.Field{"err", err}) if err != nil { return err @@ -659,9 +661,9 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption startTime := time.Now() getResp, err = s.client.KV.Get(ctx, key, options...) if recursive { - metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) + metrics.RecordEtcdRequestLatency("list", s.groupResourceString, startTime) } else { - metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) + metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime) } if err != nil { return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index 3241970f6e7..afb79662cdb 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -28,6 +28,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3/metrics" @@ -69,12 +70,13 @@ func TestOnlySetFatalOnDecodeError(b bool) { } type watcher struct { - client *clientv3.Client - codec runtime.Codec - newFunc func() runtime.Object - objectType string - versioner storage.Versioner - transformer value.Transformer + client *clientv3.Client + codec runtime.Codec + newFunc func() runtime.Object + objectType string + groupResource schema.GroupResource + versioner storage.Versioner + transformer value.Transformer } // watchChan implements watch.Interface. @@ -92,13 +94,14 @@ type watchChan struct { errChan chan error } -func newWatcher(client *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher { +func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource schema.GroupResource, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher { res := &watcher{ - client: client, - codec: codec, - newFunc: newFunc, - versioner: versioner, - transformer: transformer, + client: client, + codec: codec, + groupResource: groupResource, + newFunc: newFunc, + versioner: versioner, + transformer: transformer, } if newFunc == nil { res.objectType = "" @@ -259,7 +262,7 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) { } if wres.IsProgressNotify() { wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision())) - metrics.RecordEtcdBookmark(wc.watcher.objectType) + metrics.RecordEtcdBookmark(wc.watcher.groupResource.String()) continue } @@ -292,7 +295,7 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) { continue } if len(wc.resultChan) == outgoingBufSize { - klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType) + klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource) } // If user couldn't receive results fast enough, we also block incoming events from watcher. // Because storing events in local will cause more memory usage. @@ -411,7 +414,7 @@ func (wc *watchChan) sendError(err error) { func (wc *watchChan) sendEvent(e *event) { if len(wc.incomingEventChan) == incomingBufSize { - klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize, "objectType", wc.watcher.objectType) + klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource) } select { case wc.incomingEventChan <- e: