add metrics to gc

This commit is contained in:
Chao Xu
2016-06-30 10:56:41 -07:00
parent 4ef423bfb1
commit 41572cb22d
9 changed files with 293 additions and 23 deletions

View File

@@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/clock"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
@@ -127,7 +128,7 @@ func (m *concurrentUIDToNode) Delete(uid types.UID) {
}
type Propagator struct {
eventQueue *workqueue.Type
eventQueue *workqueue.TimedWorkQueue
// uidToNode doesn't require a lock to protect, because only the
// single-threaded Propagator.processEvent() reads/writes it.
uidToNode *concurrentUIDToNode
@@ -311,7 +312,7 @@ func (gc *GarbageCollector) removeOrphanFinalizer(owner *node) error {
// the "Orphan" finalizer. The node is add back into the orphanQueue if any of
// these steps fail.
func (gc *GarbageCollector) orphanFinalizer() {
key, quit := gc.orphanQueue.Get()
key, start, quit := gc.orphanQueue.Get()
if quit {
return
}
@@ -331,20 +332,21 @@ func (gc *GarbageCollector) orphanFinalizer() {
err := gc.orhpanDependents(owner.identity, dependents)
if err != nil {
glog.V(6).Infof("orphanDependents for %s failed with %v", owner.identity, err)
gc.orphanQueue.Add(owner)
gc.orphanQueue.AddWithTimestamp(owner, start)
return
}
// update the owner, remove "orphaningFinalizer" from its finalizers list
err = gc.removeOrphanFinalizer(owner)
if err != nil {
glog.V(6).Infof("removeOrphanFinalizer for %s failed with %v", owner.identity, err)
gc.orphanQueue.Add(owner)
gc.orphanQueue.AddWithTimestamp(owner, start)
}
OrphanProcessingLatency.Observe(sinceInMicroseconds(gc.clock, start))
}
// Dequeueing an event from eventQueue, updating graph, populating dirty_queue.
func (p *Propagator) processEvent() {
key, quit := p.eventQueue.Get()
key, start, quit := p.eventQueue.Get()
if quit {
return
}
@@ -422,6 +424,7 @@ func (p *Propagator) processEvent() {
p.gc.dirtyQueue.Add(dep)
}
}
EventProcessingLatency.Observe(sinceInMicroseconds(p.gc.clock, start))
}
// GarbageCollector is responsible for carrying out cascading deletion, and
@@ -434,11 +437,14 @@ type GarbageCollector struct {
metaOnlyClientPool dynamic.ClientPool
// clientPool uses the regular dynamicCodec. We need it to update
// finalizers. It can be removed if we support patching finalizers.
clientPool dynamic.ClientPool
dirtyQueue *workqueue.Type
orphanQueue *workqueue.Type
monitors []monitor
propagator *Propagator
clientPool dynamic.ClientPool
dirtyQueue *workqueue.TimedWorkQueue
orphanQueue *workqueue.TimedWorkQueue
monitors []monitor
propagator *Propagator
clock clock.Clock
registeredRateLimiter *RegisteredRateLimiter
registeredRateLimiterForMonitors *RegisteredRateLimiter
}
func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionResource) *cache.ListWatch {
@@ -466,14 +472,15 @@ func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionReso
}
}
func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversioned.GroupVersionResource, kind unversioned.GroupVersionKind) (monitor, error) {
func (gc *GarbageCollector) monitorFor(resource unversioned.GroupVersionResource, kind unversioned.GroupVersionKind) (monitor, error) {
// TODO: consider store in one storage.
glog.V(6).Infof("create storage for resource %s", resource)
var monitor monitor
client, err := p.gc.metaOnlyClientPool.ClientForGroupVersion(resource.GroupVersion())
client, err := gc.metaOnlyClientPool.ClientForGroupVersion(resource.GroupVersion())
if err != nil {
return monitor, err
}
gc.registeredRateLimiterForMonitors.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring")
setObjectTypeMeta := func(obj interface{}) {
runtimeObject, ok := obj.(runtime.Object)
if !ok {
@@ -493,13 +500,13 @@ func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversion
eventType: addEvent,
obj: obj,
}
p.eventQueue.Add(event)
gc.propagator.eventQueue.Add(event)
},
UpdateFunc: func(oldObj, newObj interface{}) {
setObjectTypeMeta(newObj)
setObjectTypeMeta(oldObj)
event := event{updateEvent, newObj, oldObj}
p.eventQueue.Add(event)
gc.propagator.eventQueue.Add(event)
},
DeleteFunc: func(obj interface{}) {
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
@@ -511,7 +518,7 @@ func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversion
eventType: deleteEvent,
obj: obj,
}
p.eventQueue.Add(event)
gc.propagator.eventQueue.Add(event)
},
},
)
@@ -528,16 +535,20 @@ var ignoredResources = map[unversioned.GroupVersionResource]struct{}{
}
func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) {
clock := clock.RealClock{}
gc := &GarbageCollector{
metaOnlyClientPool: metaOnlyClientPool,
clientPool: clientPool,
dirtyQueue: workqueue.New(),
orphanQueue: workqueue.New(),
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper: registered.RESTMapper(),
restMapper: registered.RESTMapper(),
clock: clock,
dirtyQueue: workqueue.NewTimedWorkQueue(clock),
orphanQueue: workqueue.NewTimedWorkQueue(clock),
registeredRateLimiter: NewRegisteredRateLimiter(),
registeredRateLimiterForMonitors: NewRegisteredRateLimiter(),
}
gc.propagator = &Propagator{
eventQueue: workqueue.New(),
eventQueue: workqueue.NewTimedWorkQueue(gc.clock),
uidToNode: &concurrentUIDToNode{
RWMutex: &sync.RWMutex{},
uidToNode: make(map[types.UID]*node),
@@ -553,7 +564,7 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam
if err != nil {
return nil, err
}
monitor, err := monitorFor(gc.propagator, gc.clientPool, resource, kind)
monitor, err := gc.monitorFor(resource, kind)
if err != nil {
return nil, err
}
@@ -563,7 +574,7 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam
}
func (gc *GarbageCollector) worker() {
key, quit := gc.dirtyQueue.Get()
key, start, quit := gc.dirtyQueue.Get()
if quit {
return
}
@@ -572,6 +583,7 @@ func (gc *GarbageCollector) worker() {
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", key, err))
}
DirtyProcessingLatency.Observe(sinceInMicroseconds(gc.clock, start))
}
// apiResource consults the REST mapper to translate an <apiVersion, kind,
@@ -594,6 +606,7 @@ func (gc *GarbageCollector) apiResource(apiVersion, kind string, namespaced bool
func (gc *GarbageCollector) deleteObject(item objectReference) error {
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil {
return err
@@ -607,6 +620,7 @@ func (gc *GarbageCollector) deleteObject(item objectReference) error {
func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructured, error) {
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil {
return nil, err
@@ -617,6 +631,7 @@ func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructur
func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unstructured) (*runtime.Unstructured, error) {
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil {
return nil, err
@@ -627,6 +642,7 @@ func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unst
func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*runtime.Unstructured, error) {
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil {
return nil, err
@@ -752,6 +768,7 @@ func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
go wait.Until(gc.worker, 0, stopCh)
go wait.Until(gc.orphanFinalizer, 0, stopCh)
}
Register()
<-stopCh
glog.Infof("Garbage Collector: Shutting down")
gc.dirtyQueue.ShutDown()

View File

@@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/clock"
"k8s.io/kubernetes/pkg/util/json"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/workqueue"
@@ -282,13 +283,14 @@ func TestProcessEvent(t *testing.T) {
for _, scenario := range testScenarios {
propagator := &Propagator{
eventQueue: workqueue.New(),
eventQueue: workqueue.NewTimedWorkQueue(clock.RealClock{}),
uidToNode: &concurrentUIDToNode{
RWMutex: &sync.RWMutex{},
uidToNode: make(map[types.UID]*node),
},
gc: &GarbageCollector{
dirtyQueue: workqueue.New(),
dirtyQueue: workqueue.NewTimedWorkQueue(clock.RealClock{}),
clock: clock.RealClock{},
},
}
for i := 0; i < len(scenario.events); i++ {

View File

@@ -0,0 +1,73 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package garbagecollector
import (
"sync"
"time"
"k8s.io/kubernetes/pkg/util/clock"
"github.com/prometheus/client_golang/prometheus"
)
const (
GarbageCollectSubsystem = "garbage_collector"
EventProcessingLatencyKey = "event_processing_latency_microseconds"
DirtyProcessingLatencyKey = "dirty_processing_latency_microseconds"
OrphanProcessingLatencyKey = "orphan_processing_latency_microseconds"
)
var (
EventProcessingLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: GarbageCollectSubsystem,
Name: EventProcessingLatencyKey,
Help: "Time in microseconds of an event spend in the eventQueue",
},
)
DirtyProcessingLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: GarbageCollectSubsystem,
Name: DirtyProcessingLatencyKey,
Help: "Time in microseconds of an item spend in the dirtyQueue",
},
)
OrphanProcessingLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: GarbageCollectSubsystem,
Name: OrphanProcessingLatencyKey,
Help: "Time in microseconds of an item spend in the orphanQueue",
},
)
)
var registerMetrics sync.Once
// Register all metrics.
func Register() {
// Register the metrics.
registerMetrics.Do(func() {
prometheus.MustRegister(EventProcessingLatency)
prometheus.MustRegister(DirtyProcessingLatency)
prometheus.MustRegister(OrphanProcessingLatency)
})
}
func sinceInMicroseconds(clock clock.Clock, start time.Time) float64 {
return float64(clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}

View File

@@ -0,0 +1,59 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package garbagecollector
import (
"fmt"
"strings"
"sync"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
"k8s.io/kubernetes/pkg/util/metrics"
)
// RegisteredRateLimiter records the registered RateLimters to avoid
// duplication.
type RegisteredRateLimiter struct {
rateLimiters map[unversioned.GroupVersion]struct{}
lock sync.RWMutex
}
// NewRegisteredRateLimiter returns a new RegisteredRateLimiater.
func NewRegisteredRateLimiter() *RegisteredRateLimiter {
return &RegisteredRateLimiter{
rateLimiters: make(map[unversioned.GroupVersion]struct{}),
}
}
func (r *RegisteredRateLimiter) registerIfNotPresent(gv unversioned.GroupVersion, client *dynamic.Client, prefix string) {
r.lock.RLock()
_, ok := r.rateLimiters[gv]
r.lock.RUnlock()
if ok {
return
}
r.lock.Lock()
defer r.lock.Unlock()
if _, ok := r.rateLimiters[gv]; !ok {
if rateLimiter := client.GetRateLimiter(); rateLimiter != nil {
group := strings.Replace(gv.Group, ".", ":", -1)
metrics.RegisterMetricAndTrackRateLimiterUsage(fmt.Sprintf("%s_%s_%s", prefix, group, gv.Version), rateLimiter)
}
r.rateLimiters[gv] = struct{}{}
}
}