From 213861716a4283f5b3badf0fd66f5d4e29292ea6 Mon Sep 17 00:00:00 2001 From: Krzysztof Siedlecki Date: Tue, 22 Jan 2019 10:57:29 +0100 Subject: [PATCH] adding trace to reflector initialization Kubernetes-commit: 21334f1f28df3c7f3408f0933ddd19cb7c64a3ea --- tools/cache/reflector.go | 86 ++++++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 38 deletions(-) diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index e6be2308..46af0221 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -40,6 +40,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + utiltrace "k8s.io/apiserver/pkg/util/trace" "k8s.io/klog" ) @@ -175,46 +176,56 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { options := metav1.ListOptions{ResourceVersion: "0"} r.metrics.numberOfLists.Inc() start := r.clock.Now() - - var list runtime.Object - var err error - listCh := make(chan struct{}, 1) - panicCh := make(chan interface{}, 1) - go func() { - defer func() { - if r := recover(); r != nil { - panicCh <- r - } + if err := func() error { + trace := utiltrace.New("Reflector_" + r.name + "_ListAndWatch") + defer trace.LogIfLong(10 * time.Second) + var list runtime.Object + var err error + listCh := make(chan struct{}, 1) + panicCh := make(chan interface{}, 1) + go func() { + defer func() { + if r := recover(); r != nil { + panicCh <- r + } + }() + list, err = r.listerWatcher.List(options) + close(listCh) }() - list, err = r.listerWatcher.List(options) - close(listCh) - }() - select { - case <-stopCh: + select { + case <-stopCh: + return nil + case r := <-panicCh: + panic(r) + case <-listCh: + } + if err != nil { + return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) + } + trace.Step("Objects listed") + r.metrics.listDuration.Observe(time.Since(start).Seconds()) + listMetaInterface, err := meta.ListAccessor(list) + if err != nil { + return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) + } + resourceVersion = listMetaInterface.GetResourceVersion() + trace.Step("Resource version extracted") + items, err := meta.ExtractList(list) + if err != nil { + return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) + } + trace.Step("Objects extracted") + r.metrics.numberOfItemsInList.Observe(float64(len(items))) + if err := r.syncWith(items, resourceVersion); err != nil { + return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) + } + trace.Step("SyncWith done") + r.setLastSyncResourceVersion(resourceVersion) + trace.Step("Resource version updated") return nil - case r := <-panicCh: - panic(r) - case <-listCh: + }(); err != nil { + return err } - if err != nil { - return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) - } - - r.metrics.listDuration.Observe(time.Since(start).Seconds()) - listMetaInterface, err := meta.ListAccessor(list) - if err != nil { - return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) - } - resourceVersion = listMetaInterface.GetResourceVersion() - items, err := meta.ExtractList(list) - if err != nil { - return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) - } - r.metrics.numberOfItemsInList.Observe(float64(len(items))) - if err := r.syncWith(items, resourceVersion); err != nil { - return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) - } - r.setLastSyncResourceVersion(resourceVersion) resyncerrc := make(chan error, 1) cancelCh := make(chan struct{}) @@ -285,7 +296,6 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } return nil } - if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)