etcd3: include GroupResource in logs/metrics

Use GroupResource instead of object reflection when recording the
following metrics:

- etcd_request_duration_seconds
- etcd_bookmark_counts

Add GroupResource to logs and traces where only reflection-based typing
was previously used.

Both of these changes allow us to disginguish between different CRDs,
all of which are represented as *unstructured.Unstructured.

Signed-off-by: Andy Goldstein <andy.goldstein@redhat.com>
This commit is contained in:
Andy Goldstein 2022-08-25 13:55:55 -04:00
parent 9ec888e873
commit 305fa2add6
No known key found for this signature in database
GPG Key ID: 409C2BE062A844A4
2 changed files with 30 additions and 25 deletions

View File

@ -110,7 +110,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob
pathPrefix: path.Join("/", prefix),
groupResource: groupResource,
groupResourceString: groupResource.String(),
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
watcher: newWatcher(c, codec, groupResource, newFunc, versioner, transformer),
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
}
return result
@ -126,7 +126,7 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
key = path.Join(s.pathPrefix, key)
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
if err != nil {
return err
}
@ -156,6 +156,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)},
utiltrace.Field{"key", key},
utiltrace.Field{"type", getTypeName(obj)},
utiltrace.Field{"resource", s.groupResourceString},
)
defer trace.LogIfLong(500 * time.Millisecond)
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
@ -189,7 +190,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Commit()
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
metrics.RecordEtcdRequestLatency("create", s.groupResourceString, startTime)
trace.Step("Txn call finished", utiltrace.Field{"err", err})
if err != nil {
return err
@ -226,7 +227,7 @@ func (s *store) conditionalDelete(
getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
if err != nil {
return nil, err
}
@ -308,7 +309,7 @@ func (s *store) conditionalDelete(
).Else(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
metrics.RecordEtcdRequestLatency("delete", s.groupResourceString, startTime)
if err != nil {
return err
}
@ -333,7 +334,8 @@ func (s *store) GuaranteedUpdate(
trace := utiltrace.New("GuaranteedUpdate etcd3",
utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)},
utiltrace.Field{"key", key},
utiltrace.Field{"type", getTypeName(destination)})
utiltrace.Field{"type", getTypeName(destination)},
utiltrace.Field{"resource", s.groupResourceString})
defer trace.LogIfLong(500 * time.Millisecond)
v, err := conversion.EnforcePtr(destination)
@ -345,7 +347,7 @@ func (s *store) GuaranteedUpdate(
getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(destination), startTime)
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
if err != nil {
return nil, err
}
@ -459,7 +461,7 @@ func (s *store) GuaranteedUpdate(
).Else(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequestLatency("update", getTypeName(destination), startTime)
metrics.RecordEtcdRequestLatency("update", s.groupResourceString, startTime)
trace.Step("Txn call finished", utiltrace.Field{"err", err})
if err != nil {
return err
@ -659,9 +661,9 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
startTime := time.Now()
getResp, err = s.client.KV.Get(ctx, key, options...)
if recursive {
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
metrics.RecordEtcdRequestLatency("list", s.groupResourceString, startTime)
} else {
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
}
if err != nil {
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)

View File

@ -28,6 +28,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
@ -69,12 +70,13 @@ func TestOnlySetFatalOnDecodeError(b bool) {
}
type watcher struct {
client *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
objectType string
versioner storage.Versioner
transformer value.Transformer
client *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
objectType string
groupResource schema.GroupResource
versioner storage.Versioner
transformer value.Transformer
}
// watchChan implements watch.Interface.
@ -92,13 +94,14 @@ type watchChan struct {
errChan chan error
}
func newWatcher(client *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher {
func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource schema.GroupResource, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher {
res := &watcher{
client: client,
codec: codec,
newFunc: newFunc,
versioner: versioner,
transformer: transformer,
client: client,
codec: codec,
groupResource: groupResource,
newFunc: newFunc,
versioner: versioner,
transformer: transformer,
}
if newFunc == nil {
res.objectType = "<unknown>"
@ -259,7 +262,7 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
}
if wres.IsProgressNotify() {
wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision()))
metrics.RecordEtcdBookmark(wc.watcher.objectType)
metrics.RecordEtcdBookmark(wc.watcher.groupResource.String())
continue
}
@ -292,7 +295,7 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
continue
}
if len(wc.resultChan) == outgoingBufSize {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType)
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
@ -411,7 +414,7 @@ func (wc *watchChan) sendError(err error) {
func (wc *watchChan) sendEvent(e *event) {
if len(wc.incomingEventChan) == incomingBufSize {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize, "objectType", wc.watcher.objectType)
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
}
select {
case wc.incomingEventChan <- e: