From 57d4daeaf49b48d5bed7660bd32cac8ddafe3960 Mon Sep 17 00:00:00 2001 From: Krzysztof Siedlecki Date: Thu, 24 Jan 2019 16:30:47 +0100 Subject: [PATCH] adding trace to reflector initialization Kubernetes-commit: 4e4d380b32105c3ff7641661120da876bca060de --- tools/cache/reflector.go | 84 +++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 36 deletions(-) diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index e6be2308..2a0f7002 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -41,6 +41,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/klog" + "k8s.io/utils/trace" ) // Reflector watches a specified resource and causes all changes to be reflected in the given store. @@ -176,45 +177,56 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { r.metrics.numberOfLists.Inc() start := r.clock.Now() - var list runtime.Object - var err error - listCh := make(chan struct{}, 1) - panicCh := make(chan interface{}, 1) - go func() { - defer func() { - if r := recover(); r != nil { - panicCh <- r - } + if err := func() error { + initTrace := trace.New("Reflector " + r.name + " ListAndWatch") + defer initTrace.LogIfLong(10 * time.Second) + var list runtime.Object + var err error + listCh := make(chan struct{}, 1) + panicCh := make(chan interface{}, 1) + go func() { + defer func() { + if r := recover(); r != nil { + panicCh <- r + } + }() + list, err = r.listerWatcher.List(options) + close(listCh) }() - list, err = r.listerWatcher.List(options) - close(listCh) - }() - select { - case <-stopCh: + select { + case <-stopCh: + return nil + case r := <-panicCh: + panic(r) + case <-listCh: + } + if err != nil { + return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) + } + initTrace.Step("Objects listed") + r.metrics.listDuration.Observe(time.Since(start).Seconds()) + listMetaInterface, err := meta.ListAccessor(list) + if err != nil { + return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) + } + resourceVersion = listMetaInterface.GetResourceVersion() + initTrace.Step("Resource version extracted") + items, err := meta.ExtractList(list) + if err != nil { + return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) + } + initTrace.Step("Objects extracted") + r.metrics.numberOfItemsInList.Observe(float64(len(items))) + if err := r.syncWith(items, resourceVersion); err != nil { + return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) + } + initTrace.Step("SyncWith done") + r.setLastSyncResourceVersion(resourceVersion) + initTrace.Step("Resource version updated") return nil - case r := <-panicCh: - panic(r) - case <-listCh: + }(); err != nil { + return err } - if err != nil { - return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) - } - - r.metrics.listDuration.Observe(time.Since(start).Seconds()) - listMetaInterface, err := meta.ListAccessor(list) - if err != nil { - return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) - } - resourceVersion = listMetaInterface.GetResourceVersion() - items, err := meta.ExtractList(list) - if err != nil { - return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) - } - r.metrics.numberOfItemsInList.Observe(float64(len(items))) - if err := r.syncWith(items, resourceVersion); err != nil { - return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) - } - r.setLastSyncResourceVersion(resourceVersion) resyncerrc := make(chan error, 1) cancelCh := make(chan struct{})