From a335243fc87f388eb13a5ece3e6f3295178ba55e Mon Sep 17 00:00:00 2001 From: deads2k Date: Wed, 28 Jun 2017 16:30:23 -0400 Subject: [PATCH] add reflector metrics Kubernetes-commit: 151d39682e62b288c247d8174a5f7fb139ee7bd1 --- tools/cache/BUILD | 1 + tools/cache/reflector.go | 30 +++++++- tools/cache/reflector_metrics.go | 119 +++++++++++++++++++++++++++++++ 3 files changed, 147 insertions(+), 3 deletions(-) create mode 100644 tools/cache/reflector_metrics.go diff --git a/tools/cache/BUILD b/tools/cache/BUILD index 9b542466..6c5fd44e 100644 --- a/tools/cache/BUILD +++ b/tools/cache/BUILD @@ -56,6 +56,7 @@ go_library( "mutation_cache.go", "mutation_detector.go", "reflector.go", + "reflector_metrics.go", "shared_informer.go", "store.go", "thread_safe_store.go", diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index c09a7865..ca73ba92 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -48,6 +48,8 @@ import ( type Reflector struct { // name identifies this reflector. By default it will be a file:line if possible. name string + // metrics tracks basic metric information about the reflector + metrics *reflectorMetrics // The type of object we expect to place in the store. expectedType reflect.Type @@ -99,7 +101,9 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn // NewNamedReflector same as NewReflector, but with a specified name for logging func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { r := &Reflector{ - name: name, + name: name, + // we need this to be unique per process (some names are still the same)but obvious who it belongs to + metrics: newReflectorMetrics(makeValidPromethusMetricName(fmt.Sprintf("reflector_"+name+"_%07d", rand.Intn(1000000)))), listerWatcher: lw, store: store, expectedType: reflect.TypeOf(expectedType), @@ -110,6 +114,11 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, return r } +func makeValidPromethusMetricName(in string) string { + // this isn't perfect, but it removes our common characters + return strings.NewReplacer("/", "_", ".", "_", "-", "_").Replace(in) +} + // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common // call chains to NewReflector, so they'd be low entropy names for reflectors var internalPackages = []string{"client-go/tools/cache/", "/runtime/asm_"} @@ -231,10 +240,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // to be served from cache and potentially be delayed relative to // etcd contents. Reflector framework will catch up via Watch() eventually. options := metav1.ListOptions{ResourceVersion: "0"} + r.metrics.numberOfLists.Inc() + start := r.clock.Now() list, err := r.listerWatcher.List(options) if err != nil { return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) } + r.metrics.listDuration.Observe(time.Since(start).Seconds()) listMetaInterface, err := meta.ListAccessor(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) @@ -244,6 +256,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) } + r.metrics.numberOfItemsInList.Observe(float64(len(items))) if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) } @@ -282,6 +295,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { TimeoutSeconds: &timemoutseconds, } + r.metrics.numberOfWatches.Inc() w, err := r.listerWatcher.Watch(options) if err != nil { switch err { @@ -333,6 +347,11 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, err // Stopping the watcher should be idempotent and if we return from this function there's no way // we're coming back in with the same watch interface. defer w.Stop() + // update metrics + defer func() { + r.metrics.numberOfItemsInWatch.Observe(float64(eventCount)) + r.metrics.watchDuration.Observe(time.Since(start).Seconds()) + }() loop: for { @@ -388,8 +407,8 @@ loop: watchDuration := r.clock.Now().Sub(start) if watchDuration < 1*time.Second && eventCount == 0 { - glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name) - return errors.New("very short watch") + r.metrics.numberOfShortWatches.Inc() + return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) } glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount) return nil @@ -407,4 +426,9 @@ func (r *Reflector) setLastSyncResourceVersion(v string) { r.lastSyncResourceVersionMutex.Lock() defer r.lastSyncResourceVersionMutex.Unlock() r.lastSyncResourceVersion = v + + rv, err := strconv.Atoi(v) + if err == nil { + r.metrics.lastResourceVersion.Set(float64(rv)) + } } diff --git a/tools/cache/reflector_metrics.go b/tools/cache/reflector_metrics.go new file mode 100644 index 00000000..0945e5c3 --- /dev/null +++ b/tools/cache/reflector_metrics.go @@ -0,0 +1,119 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file provides abstractions for setting the provider (e.g., prometheus) +// of metrics. + +package cache + +import ( + "sync" +) + +// GaugeMetric represents a single numerical value that can arbitrarily go up +// and down. +type GaugeMetric interface { + Set(float64) +} + +// CounterMetric represents a single numerical value that only ever +// goes up. +type CounterMetric interface { + Inc() +} + +// SummaryMetric captures individual observations. +type SummaryMetric interface { + Observe(float64) +} + +type noopMetric struct{} + +func (noopMetric) Inc() {} +func (noopMetric) Dec() {} +func (noopMetric) Observe(float64) {} +func (noopMetric) Set(float64) {} + +type reflectorMetrics struct { + numberOfLists CounterMetric + listDuration SummaryMetric + numberOfItemsInList SummaryMetric + + numberOfWatches CounterMetric + numberOfShortWatches CounterMetric + watchDuration SummaryMetric + numberOfItemsInWatch SummaryMetric + + lastResourceVersion GaugeMetric +} + +// MetricsProvider generates various metrics used by the reflector. +type MetricsProvider interface { + NewListsMetric(name string) CounterMetric + NewListDurationMetric(name string) SummaryMetric + NewItemsInListMetric(name string) SummaryMetric + + NewWatchesMetric(name string) CounterMetric + NewShortWatchesMetric(name string) CounterMetric + NewWatchDurationMetric(name string) SummaryMetric + NewItemsInWatchMetric(name string) SummaryMetric + + NewLastResourceVersionMetric(name string) GaugeMetric +} + +type noopMetricsProvider struct{} + +func (noopMetricsProvider) NewListsMetric(name string) CounterMetric { return noopMetric{} } +func (noopMetricsProvider) NewListDurationMetric(name string) SummaryMetric { return noopMetric{} } +func (noopMetricsProvider) NewItemsInListMetric(name string) SummaryMetric { return noopMetric{} } +func (noopMetricsProvider) NewWatchesMetric(name string) CounterMetric { return noopMetric{} } +func (noopMetricsProvider) NewShortWatchesMetric(name string) CounterMetric { return noopMetric{} } +func (noopMetricsProvider) NewWatchDurationMetric(name string) SummaryMetric { return noopMetric{} } +func (noopMetricsProvider) NewItemsInWatchMetric(name string) SummaryMetric { return noopMetric{} } +func (noopMetricsProvider) NewLastResourceVersionMetric(name string) GaugeMetric { + return noopMetric{} +} + +var metricsFactory = struct { + metricsProvider MetricsProvider + setProviders sync.Once +}{ + metricsProvider: noopMetricsProvider{}, +} + +func newReflectorMetrics(name string) *reflectorMetrics { + var ret *reflectorMetrics + if len(name) == 0 { + return ret + } + return &reflectorMetrics{ + numberOfLists: metricsFactory.metricsProvider.NewListsMetric(name), + listDuration: metricsFactory.metricsProvider.NewListDurationMetric(name), + numberOfItemsInList: metricsFactory.metricsProvider.NewItemsInListMetric(name), + numberOfWatches: metricsFactory.metricsProvider.NewWatchesMetric(name), + numberOfShortWatches: metricsFactory.metricsProvider.NewShortWatchesMetric(name), + watchDuration: metricsFactory.metricsProvider.NewWatchDurationMetric(name), + numberOfItemsInWatch: metricsFactory.metricsProvider.NewItemsInWatchMetric(name), + lastResourceVersion: metricsFactory.metricsProvider.NewLastResourceVersionMetric(name), + } +} + +// SetReflectorMetricsProvider sets the metrics provider +func SetReflectorMetricsProvider(metricsProvider MetricsProvider) { + metricsFactory.setProviders.Do(func() { + metricsFactory.metricsProvider = metricsProvider + }) +}