mirror of
https://github.com/kubernetes/client-go.git
synced 2025-08-14 05:25:33 +00:00
add reflector metrics
Kubernetes-commit: 151d39682e62b288c247d8174a5f7fb139ee7bd1
This commit is contained in:
parent
953296ece8
commit
a335243fc8
1
tools/cache/BUILD
vendored
1
tools/cache/BUILD
vendored
@ -56,6 +56,7 @@ go_library(
|
|||||||
"mutation_cache.go",
|
"mutation_cache.go",
|
||||||
"mutation_detector.go",
|
"mutation_detector.go",
|
||||||
"reflector.go",
|
"reflector.go",
|
||||||
|
"reflector_metrics.go",
|
||||||
"shared_informer.go",
|
"shared_informer.go",
|
||||||
"store.go",
|
"store.go",
|
||||||
"thread_safe_store.go",
|
"thread_safe_store.go",
|
||||||
|
30
tools/cache/reflector.go
vendored
30
tools/cache/reflector.go
vendored
@ -48,6 +48,8 @@ import (
|
|||||||
type Reflector struct {
|
type Reflector struct {
|
||||||
// name identifies this reflector. By default it will be a file:line if possible.
|
// name identifies this reflector. By default it will be a file:line if possible.
|
||||||
name string
|
name string
|
||||||
|
// metrics tracks basic metric information about the reflector
|
||||||
|
metrics *reflectorMetrics
|
||||||
|
|
||||||
// The type of object we expect to place in the store.
|
// The type of object we expect to place in the store.
|
||||||
expectedType reflect.Type
|
expectedType reflect.Type
|
||||||
@ -99,7 +101,9 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn
|
|||||||
// NewNamedReflector same as NewReflector, but with a specified name for logging
|
// NewNamedReflector same as NewReflector, but with a specified name for logging
|
||||||
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
|
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
|
||||||
r := &Reflector{
|
r := &Reflector{
|
||||||
name: name,
|
name: name,
|
||||||
|
// we need this to be unique per process (some names are still the same)but obvious who it belongs to
|
||||||
|
metrics: newReflectorMetrics(makeValidPromethusMetricName(fmt.Sprintf("reflector_"+name+"_%07d", rand.Intn(1000000)))),
|
||||||
listerWatcher: lw,
|
listerWatcher: lw,
|
||||||
store: store,
|
store: store,
|
||||||
expectedType: reflect.TypeOf(expectedType),
|
expectedType: reflect.TypeOf(expectedType),
|
||||||
@ -110,6 +114,11 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeValidPromethusMetricName(in string) string {
|
||||||
|
// this isn't perfect, but it removes our common characters
|
||||||
|
return strings.NewReplacer("/", "_", ".", "_", "-", "_").Replace(in)
|
||||||
|
}
|
||||||
|
|
||||||
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
|
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
|
||||||
// call chains to NewReflector, so they'd be low entropy names for reflectors
|
// call chains to NewReflector, so they'd be low entropy names for reflectors
|
||||||
var internalPackages = []string{"client-go/tools/cache/", "/runtime/asm_"}
|
var internalPackages = []string{"client-go/tools/cache/", "/runtime/asm_"}
|
||||||
@ -231,10 +240,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
// to be served from cache and potentially be delayed relative to
|
// to be served from cache and potentially be delayed relative to
|
||||||
// etcd contents. Reflector framework will catch up via Watch() eventually.
|
// etcd contents. Reflector framework will catch up via Watch() eventually.
|
||||||
options := metav1.ListOptions{ResourceVersion: "0"}
|
options := metav1.ListOptions{ResourceVersion: "0"}
|
||||||
|
r.metrics.numberOfLists.Inc()
|
||||||
|
start := r.clock.Now()
|
||||||
list, err := r.listerWatcher.List(options)
|
list, err := r.listerWatcher.List(options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
|
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
|
||||||
}
|
}
|
||||||
|
r.metrics.listDuration.Observe(time.Since(start).Seconds())
|
||||||
listMetaInterface, err := meta.ListAccessor(list)
|
listMetaInterface, err := meta.ListAccessor(list)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
|
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
|
||||||
@ -244,6 +256,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
|
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
|
||||||
}
|
}
|
||||||
|
r.metrics.numberOfItemsInList.Observe(float64(len(items)))
|
||||||
if err := r.syncWith(items, resourceVersion); err != nil {
|
if err := r.syncWith(items, resourceVersion); err != nil {
|
||||||
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
|
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
|
||||||
}
|
}
|
||||||
@ -282,6 +295,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
TimeoutSeconds: &timemoutseconds,
|
TimeoutSeconds: &timemoutseconds,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.metrics.numberOfWatches.Inc()
|
||||||
w, err := r.listerWatcher.Watch(options)
|
w, err := r.listerWatcher.Watch(options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch err {
|
switch err {
|
||||||
@ -333,6 +347,11 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, err
|
|||||||
// Stopping the watcher should be idempotent and if we return from this function there's no way
|
// Stopping the watcher should be idempotent and if we return from this function there's no way
|
||||||
// we're coming back in with the same watch interface.
|
// we're coming back in with the same watch interface.
|
||||||
defer w.Stop()
|
defer w.Stop()
|
||||||
|
// update metrics
|
||||||
|
defer func() {
|
||||||
|
r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
|
||||||
|
r.metrics.watchDuration.Observe(time.Since(start).Seconds())
|
||||||
|
}()
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for {
|
for {
|
||||||
@ -388,8 +407,8 @@ loop:
|
|||||||
|
|
||||||
watchDuration := r.clock.Now().Sub(start)
|
watchDuration := r.clock.Now().Sub(start)
|
||||||
if watchDuration < 1*time.Second && eventCount == 0 {
|
if watchDuration < 1*time.Second && eventCount == 0 {
|
||||||
glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
|
r.metrics.numberOfShortWatches.Inc()
|
||||||
return errors.New("very short watch")
|
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
|
glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
|
||||||
return nil
|
return nil
|
||||||
@ -407,4 +426,9 @@ func (r *Reflector) setLastSyncResourceVersion(v string) {
|
|||||||
r.lastSyncResourceVersionMutex.Lock()
|
r.lastSyncResourceVersionMutex.Lock()
|
||||||
defer r.lastSyncResourceVersionMutex.Unlock()
|
defer r.lastSyncResourceVersionMutex.Unlock()
|
||||||
r.lastSyncResourceVersion = v
|
r.lastSyncResourceVersion = v
|
||||||
|
|
||||||
|
rv, err := strconv.Atoi(v)
|
||||||
|
if err == nil {
|
||||||
|
r.metrics.lastResourceVersion.Set(float64(rv))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
119
tools/cache/reflector_metrics.go
vendored
Normal file
119
tools/cache/reflector_metrics.go
vendored
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// This file provides abstractions for setting the provider (e.g., prometheus)
|
||||||
|
// of metrics.
|
||||||
|
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GaugeMetric represents a single numerical value that can arbitrarily go up
|
||||||
|
// and down.
|
||||||
|
type GaugeMetric interface {
|
||||||
|
Set(float64)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CounterMetric represents a single numerical value that only ever
|
||||||
|
// goes up.
|
||||||
|
type CounterMetric interface {
|
||||||
|
Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SummaryMetric captures individual observations.
|
||||||
|
type SummaryMetric interface {
|
||||||
|
Observe(float64)
|
||||||
|
}
|
||||||
|
|
||||||
|
type noopMetric struct{}
|
||||||
|
|
||||||
|
func (noopMetric) Inc() {}
|
||||||
|
func (noopMetric) Dec() {}
|
||||||
|
func (noopMetric) Observe(float64) {}
|
||||||
|
func (noopMetric) Set(float64) {}
|
||||||
|
|
||||||
|
type reflectorMetrics struct {
|
||||||
|
numberOfLists CounterMetric
|
||||||
|
listDuration SummaryMetric
|
||||||
|
numberOfItemsInList SummaryMetric
|
||||||
|
|
||||||
|
numberOfWatches CounterMetric
|
||||||
|
numberOfShortWatches CounterMetric
|
||||||
|
watchDuration SummaryMetric
|
||||||
|
numberOfItemsInWatch SummaryMetric
|
||||||
|
|
||||||
|
lastResourceVersion GaugeMetric
|
||||||
|
}
|
||||||
|
|
||||||
|
// MetricsProvider generates various metrics used by the reflector.
|
||||||
|
type MetricsProvider interface {
|
||||||
|
NewListsMetric(name string) CounterMetric
|
||||||
|
NewListDurationMetric(name string) SummaryMetric
|
||||||
|
NewItemsInListMetric(name string) SummaryMetric
|
||||||
|
|
||||||
|
NewWatchesMetric(name string) CounterMetric
|
||||||
|
NewShortWatchesMetric(name string) CounterMetric
|
||||||
|
NewWatchDurationMetric(name string) SummaryMetric
|
||||||
|
NewItemsInWatchMetric(name string) SummaryMetric
|
||||||
|
|
||||||
|
NewLastResourceVersionMetric(name string) GaugeMetric
|
||||||
|
}
|
||||||
|
|
||||||
|
type noopMetricsProvider struct{}
|
||||||
|
|
||||||
|
func (noopMetricsProvider) NewListsMetric(name string) CounterMetric { return noopMetric{} }
|
||||||
|
func (noopMetricsProvider) NewListDurationMetric(name string) SummaryMetric { return noopMetric{} }
|
||||||
|
func (noopMetricsProvider) NewItemsInListMetric(name string) SummaryMetric { return noopMetric{} }
|
||||||
|
func (noopMetricsProvider) NewWatchesMetric(name string) CounterMetric { return noopMetric{} }
|
||||||
|
func (noopMetricsProvider) NewShortWatchesMetric(name string) CounterMetric { return noopMetric{} }
|
||||||
|
func (noopMetricsProvider) NewWatchDurationMetric(name string) SummaryMetric { return noopMetric{} }
|
||||||
|
func (noopMetricsProvider) NewItemsInWatchMetric(name string) SummaryMetric { return noopMetric{} }
|
||||||
|
func (noopMetricsProvider) NewLastResourceVersionMetric(name string) GaugeMetric {
|
||||||
|
return noopMetric{}
|
||||||
|
}
|
||||||
|
|
||||||
|
var metricsFactory = struct {
|
||||||
|
metricsProvider MetricsProvider
|
||||||
|
setProviders sync.Once
|
||||||
|
}{
|
||||||
|
metricsProvider: noopMetricsProvider{},
|
||||||
|
}
|
||||||
|
|
||||||
|
func newReflectorMetrics(name string) *reflectorMetrics {
|
||||||
|
var ret *reflectorMetrics
|
||||||
|
if len(name) == 0 {
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
return &reflectorMetrics{
|
||||||
|
numberOfLists: metricsFactory.metricsProvider.NewListsMetric(name),
|
||||||
|
listDuration: metricsFactory.metricsProvider.NewListDurationMetric(name),
|
||||||
|
numberOfItemsInList: metricsFactory.metricsProvider.NewItemsInListMetric(name),
|
||||||
|
numberOfWatches: metricsFactory.metricsProvider.NewWatchesMetric(name),
|
||||||
|
numberOfShortWatches: metricsFactory.metricsProvider.NewShortWatchesMetric(name),
|
||||||
|
watchDuration: metricsFactory.metricsProvider.NewWatchDurationMetric(name),
|
||||||
|
numberOfItemsInWatch: metricsFactory.metricsProvider.NewItemsInWatchMetric(name),
|
||||||
|
lastResourceVersion: metricsFactory.metricsProvider.NewLastResourceVersionMetric(name),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetReflectorMetricsProvider sets the metrics provider
|
||||||
|
func SetReflectorMetricsProvider(metricsProvider MetricsProvider) {
|
||||||
|
metricsFactory.setProviders.Do(func() {
|
||||||
|
metricsFactory.metricsProvider = metricsProvider
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user